KafkaTopic API#
Handle#
get_kafka_api#
Project.get_kafka_api()
Get the kafka api for the project.
Returns
KafkaApi: The Kafka Api handle
Configuration#
get_default_config#
KafkaApi.get_default_config()
Get the configuration to set up a Producer or Consumer for a Kafka broker using confluent-kafka.
import hopsworks
project = hopsworks.login()
kafka_api = project.get_kafka_api()
kafka_conf = kafka_api.get_default_config()
from confluent_kafka import Producer
producer = Producer(kafka_conf)
dict: The kafka configuration
Raises
RestAPIError: If unable to get the kafka configuration.
Creation#
create_topic#
KafkaApi.create_topic(name, schema, schema_version, replicas=1, partitions=1)
Create a new kafka topic.
import hopsworks
project = hopsworks.login()
kafka_api = project.get_kafka_api()
kafka_topic = kafka_api.create_topic("my_topic", "my_schema", 1)
- name
str: name of the topic - schema
str: subject name of the schema - schema_version
int: version of the schema - replicas
int: replication factor for the topic - partitions
int: partitions for the topic
Returns
KafkaTopic: The KafkaTopic object
Raises
RestAPIError: If unable to create the topic
Retrieval#
get_topic#
KafkaApi.get_topic(name)
Get kafka topic by name.
Arguments
- name
str: name of the topic
Returns
KafkaTopic: The KafkaTopic object
Raises
RestAPIError: If unable to get the topic
get_topics#
KafkaApi.get_topics()
Get all kafka topics.
Returns
List[KafkaTopic]: List of KafkaTopic objects
Raises
RestAPIError: If unable to get the topics
Properties#
name#
Name of the topic
partitions#
Number of partitions for the topic
replicas#
Replication factor for the topic
schema#
Schema for the topic
Methods#
delete#
KafkaTopic.delete()
Delete the topic
Potentially dangerous operation
This operation deletes the topic.
Raises
RestAPIError.