Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka-python read from last produced message after a consumer restart

i am using kafka-python to consume messages from a kafka queue (kafka version 0.10.2.0). In particular i am using KafkaConsumer type. If the consumer stops and after a while it is restarted i would like to restart from the latest produced message, that is drop all the messages produced during the time the consumer was down. How can i achieve this?

Thanks

like image 442
ugomaria Avatar asked Dec 06 '22 15:12

ugomaria


1 Answers

Thanks,

it works!

This is a simplified versione of my code:

consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
#dummy poll
consumer.poll()
#go to end of the stream
consumer.seek_to_end()
#start iterate
for message in consumer:
    print(message)

consumer.close()

The documentation states that the poll() method is incompatible with the iterator interface, which i guess is the the one I use in the loop at the end of my script. However from initial testing, this code looks like to work correctly.

Is it safe to use it? Or did I misunderstood the docuementation?

Thanks

like image 153
ugomaria Avatar answered Jan 15 '23 22:01

ugomaria