Skip to content

hopsworks.core.kafka_api #

[source] KafkaApi #

For backwards compatibility hopsworks.core.kafka_api.KafkaApi is still available as hsfs.core.kafka_api.KafkaApi. The use of this alias is discouraged as it is to be deprecated.

[source] create_topic #

create_topic(
    name: str,
    schema: str,
    schema_version: int,
    replicas: int = 1,
    partitions: int = 1,
) -> kafka_topic.KafkaTopic

Create a new kafka topic.

import hopsworks

project = hopsworks.login()

kafka_api = project.get_kafka_api()

kafka_topic = kafka_api.create_topic("my_topic", "my_schema", 1)
PARAMETER DESCRIPTION
name

Name of the topic.

TYPE: str

schema

Subject name of the schema.

TYPE: str

schema_version

Version of the schema.

TYPE: int

replicas

Replication factor for the topic.

TYPE: int DEFAULT: 1

partitions

Number of partitions for the topic.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
kafka_topic.KafkaTopic

The KafkaTopic object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] create_schema #

create_schema(
    subject: str, schema: dict
) -> kafka_schema.KafkaSchema

Create a new kafka schema.

import hopsworks

project = hopsworks.login()

kafka_api = project.get_kafka_api()

avro_schema = {
  "type": "record",
  "name": "tutorial",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "data",
      "type": "string"
    }
  ]
}

kafka_topic = kafka_api.create_schema("my_schema", avro_schema)
PARAMETER DESCRIPTION
subject

Subject name of the schema.

TYPE: str

schema

Avro schema definition.

TYPE: dict

RETURNS DESCRIPTION
kafka_schema.KafkaSchema

The KafkaSchema object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] get_topic #

get_topic(name: str) -> kafka_topic.KafkaTopic | None

Get kafka topic by name.

PARAMETER DESCRIPTION
name

Name of the topic.

TYPE: str

RETURNS DESCRIPTION
kafka_topic.KafkaTopic | None

The KafkaTopic object or None if not found.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] get_topics #

get_topics() -> list[kafka_topic.KafkaTopic]

Get all kafka topics.

RETURNS DESCRIPTION
list[kafka_topic.KafkaTopic]

List of KafkaTopic objects.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] get_subjects #

get_subjects() -> list[str]

Get all subjects.

RETURNS DESCRIPTION
list[str]

List of registered subjects.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] get_schemas #

get_schemas(subject: str) -> list[kafka_schema.KafkaSchema]

Get all schema versions for the subject.

PARAMETER DESCRIPTION
subject

Subject name.

TYPE: str

RETURNS DESCRIPTION
list[kafka_schema.KafkaSchema]

List of KafkaSchema objects.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] get_schema #

get_schema(
    subject: str, version: int
) -> kafka_schema.KafkaSchema | None

Get schema given subject name and version.

PARAMETER DESCRIPTION
subject

Subject name.

TYPE: str

version

Version number.

TYPE: int

RETURNS DESCRIPTION
kafka_schema.KafkaSchema | None

KafkaSchema object or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

[source] get_default_config #

get_default_config(
    internal_kafka: bool | None = None,
) -> dict

Get the configuration to set up a Producer or Consumer for a Kafka broker using confluent-kafka.

import hopsworks

project = hopsworks.login()

kafka_api = project.get_kafka_api()

kafka_conf = kafka_api.get_default_config()

from confluent_kafka import Producer

producer = Producer(kafka_conf)
RETURNS DESCRIPTION
dict

The kafka configuration.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.