Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to close kafka consumer once all messages are consumed?

I have following program to consume all the messages coming to Kafka.

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'])
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
KafkaConsumer.close()

Using above program i am able to consume all the messages coming to Kafka. But once all messages are consumed, i want to close the kafka consumer which is not happening. I need help in same.

like image 859
pankmish Avatar asked Jul 17 '17 12:07

pankmish


People also ask

How do you stop a consumer in Kafka?

Stopping a Kafka Consumer We can use rest api to stop a running consumer. However, we need consumer id to stop the running consumer, so the consumer id needs to be sent. Then try to access the POST http://localhost:8080/api/kafka/registry/deactivate by sending the id parameter of the consumer you want to stop.

Do I need to close Kafka consumer?

Always close() the consumer before exiting. This will close the network connections and sockets.

How do you check Kafka consumer is running?

You can use consumer. assignment() , it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.

Can Kafka message be consumed multiple times?

A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.


1 Answers

I am able to close kafka consumer now if i provide consumer_timeout_ms argument to KafkaConsumer object. It accepts timeout value in millisecond. Below is the code snippet.

from kafka import KafkaConsumer

consumer = KafkaConsumer('my_test_topic',
                         group_id='my-group',
                         bootstrap_servers=['my_kafka:9092'],
                         consumer_timeout_ms=1000)
for message in consumer:
    consumer.commit()
    print ("%s key=%s value=%s" % (message.topic,message.key,
                                          message.value))
KafkaConsumer.close()

In above code if consumer doesn't see any message for 1 second it will close the session.

like image 126
pankmish Avatar answered Nov 01 '22 08:11

pankmish