KafkaTopic API#
Handle#
get_kafka_api#
Project.get_kafka_api()
Get the kafka api for the project.
Returns
KafkaApi
: The Kafka Api handle
Configuration#
get_default_config#
KafkaApi.get_default_config()
Get the configuration to set up a Producer or Consumer for a Kafka broker using confluent-kafka.
import hopsworks
connection = hopsworks.connection()
project = connection.get_project()
kafka_api = project.get_kafka_api()
kafka_conf = kafka_api.get_default_config()
from confluent_kafka import Producer
producer = Producer(kafka_conf)
dict
: The kafka configuration
Raises
RestAPIError
: If unable to get the kafka configuration.
Creation#
create_topic#
KafkaApi.create_topic(name, schema, schema_version, replicas=1, partitions=1)
Create a new kafka topic.
import hopsworks
connection = hopsworks.connection()
project = connection.get_project()
kafka_api = project.get_kafka_api()
kafka_topic = kafka_api.create_topic("my_topic", "my_schema", 1)
- name
str
: name of the topic - schema
str
: subject name of the schema - schema_version
int
: version of the schema - replicas
int
: replication factor for the topic - partitions
int
: partitions for the topic
Returns
KafkaTopic
: The KafkaTopic object
Raises
RestAPIError
: If unable to create the topic
Retrieval#
get_topic#
KafkaApi.get_topic(name)
Get kafka topic by name.
Arguments
- name
str
: name of the topic
Returns
KafkaTopic
: The KafkaTopic object
Raises
RestAPIError
: If unable to get the topic
get_topics#
KafkaApi.get_topics()
Get all kafka topics.
Returns
List[KafkaTopic]
: List of KafkaTopic objects
Raises
RestAPIError
: If unable to get the topics
Properties#
name#
Name of the topic
partitions#
Number of partitions for the topic
replicas#
Replication factor for the topic
schema#
Schema for the topic
Methods#
delete#
KafkaTopic.delete()
Delete the topic
Potentially dangerous operation
This operation deletes the topic.
Raises
RestAPIError
.