hopsworks.core.kafka_api #
[source] KafkaApi #
For backwards compatibility hopsworks.core.kafka_api.KafkaApi is still available as hsfs.core.kafka_api.KafkaApi. The use of this alias is discouraged as it is to be deprecated.
Returned by
[source] create_topic #
create_topic(
name: str,
schema: str,
schema_version: int,
replicas: int = 1,
partitions: int = 1,
) -> kafka_topic.KafkaTopic
Create a new kafka topic.
import hopsworks
project = hopsworks.login()
kafka_api = project.get_kafka_api()
kafka_topic = kafka_api.create_topic("my_topic", "my_schema", 1)
| PARAMETER | DESCRIPTION |
|---|---|
name | Name of the topic. TYPE: |
schema | Subject name of the schema. TYPE: |
schema_version | Version of the schema. TYPE: |
replicas | Replication factor for the topic. TYPE: |
partitions | Number of partitions for the topic. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
kafka_topic.KafkaTopic | The KafkaTopic object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] create_schema #
create_schema(
subject: str, schema: dict
) -> kafka_schema.KafkaSchema
Create a new kafka schema.
import hopsworks
project = hopsworks.login()
kafka_api = project.get_kafka_api()
avro_schema = {
"type": "record",
"name": "tutorial",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "data",
"type": "string"
}
]
}
kafka_topic = kafka_api.create_schema("my_schema", avro_schema)
| PARAMETER | DESCRIPTION |
|---|---|
subject | Subject name of the schema. TYPE: |
schema | Avro schema definition. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
kafka_schema.KafkaSchema | The KafkaSchema object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] get_topic #
get_topic(name: str) -> kafka_topic.KafkaTopic | None
Get kafka topic by name.
| PARAMETER | DESCRIPTION |
|---|---|
name | Name of the topic. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
kafka_topic.KafkaTopic | None | The KafkaTopic object or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] get_topics #
get_topics() -> list[kafka_topic.KafkaTopic]
Get all kafka topics.
| RETURNS | DESCRIPTION |
|---|---|
list[kafka_topic.KafkaTopic] | List of KafkaTopic objects. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] get_schemas #
get_schemas(subject: str) -> list[kafka_schema.KafkaSchema]
Get all schema versions for the subject.
| PARAMETER | DESCRIPTION |
|---|---|
subject | Subject name. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[kafka_schema.KafkaSchema] | List of KafkaSchema objects. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] get_schema #
get_schema(
subject: str, version: int
) -> kafka_schema.KafkaSchema | None
Get schema given subject name and version.
| PARAMETER | DESCRIPTION |
|---|---|
subject | Subject name. TYPE: |
version | Version number. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
kafka_schema.KafkaSchema | None | KafkaSchema object or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
[source] get_default_config #
Get the configuration to set up a Producer or Consumer for a Kafka broker using confluent-kafka.
import hopsworks
project = hopsworks.login()
kafka_api = project.get_kafka_api()
kafka_conf = kafka_api.get_default_config()
from confluent_kafka import Producer
producer = Producer(kafka_conf)
| RETURNS | DESCRIPTION |
|---|---|
dict | The kafka configuration. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |