I am using the below code to read messages from a topic. I am facing two issues. Whenever i start consumer, it is reading all the messages in the queue? How do read only the unread messages?
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
consumer.commit()
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
Reading whole messages To do so, use '-from-beginning' command with the above kafka console consumer command as: 'kafka-console-consumer. bat -bootstrap-server 127.0. 0.1:9092 -topic myfirst -from-beginning'.
Zero lag for every partition will indicate that the messages have been consumed successfully, and offsets committed by the consumer.
Yes, Kafka's design allows consumers from one consumer group to consume messages from multiple topics.
As @Kenji said you have to commit the offsets with consumer.commit()
. If you don't want to commit manually you can enable autocommit by passing enable_auto_commit=True
to your KafkaConsumer
. You may also want to tune auto_commit_interval_ms
which is the interval in milliseconds between each automatic commit. See here: http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With