i am using kafka-python to consume messages from a kafka queue (kafka version 0.10.2.0). In particular i am using KafkaConsumer type. If the consumer stops and after a while it is restarted i would like to restart from the latest produced message, that is drop all the messages produced during the time the consumer was down. How can i achieve this?
Thanks
Thanks,
it works!
This is a simplified versione of my code:
consumer = KafkaConsumer('mytopic', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
#dummy poll
consumer.poll()
#go to end of the stream
consumer.seek_to_end()
#start iterate
for message in consumer:
print(message)
consumer.close()
The documentation states that the poll() method is incompatible with the iterator interface, which i guess is the the one I use in the loop at the end of my script. However from initial testing, this code looks like to work correctly.
Is it safe to use it? Or did I misunderstood the docuementation?
Thanks
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With