Skip to content

How To Consume Message From A Topic#

Introduction#

A Consumer is a process which reads messages from a Kafka topic. In Hopsworks, all user roles are capable of performing 'Read' and 'Describe' actions on Kafka topics within projects that they are a member of or are shared with them.

Prerequisites#

This guide requires that you have previously produced messages to a kafka topic.

Code#

In this guide, you will learn how to consume messages from a kafka topic.

Step 1: Get the Kafka API#

import hopsworks

project = hopsworks.login()

kafka_api = project.get_kafka_api()

Step 2: Configure confluent-kafka client#

consumer_config = kafka_api.get_default_config()
consumer_config['default.topic.config'] = {'auto.offset.reset': 'earliest'}

from confluent_kafka import Consumer

consumer = Consumer(consumer_config)

Step 3: Consume messages from a topic#

# Subscribe to topic
consumer.subscribe(["my_topic"])

for i in range(0, 10):
    msg = consumer.poll(timeout=10.0)
    print(msg.value())

API Reference#

KafkaTopic