How To Consume Message From A Topic#
Introduction#
A Consumer is a process which reads messages from a Kafka topic. In Hopsworks, all user roles are capable of performing 'Read' and 'Describe' actions on Kafka topics within projects that they are a member of or are shared with them.
Prerequisites#
This guide requires that you have previously produced messages to a kafka topic.
Code#
In this guide, you will learn how to consume messages from a kafka topic.
Step 1: Get the Kafka API#
import hopsworks
project = hopsworks.login()
kafka_api = project.get_kafka_api()
Step 2: Configure confluent-kafka client#
consumer_config = kafka_api.get_default_config()
consumer_config['default.topic.config'] = {'auto.offset.reset': 'earliest'}
from confluent_kafka import Consumer
consumer = Consumer(consumer_config)
Step 3: Consume messages from a topic#
# Subscribe to topic
consumer.subscribe(["my_topic"])
for i in range(0, 10):
msg = consumer.poll(timeout=10.0)
print(msg.value())