I am using the below code to read messages from a topic. How do i delete a message after it is read?
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
There is no way to delete a specific message from kafka - kafka simply is not designed to do that. The only way to delete messages is by setting log.retention.hours
in kafka's config/server.properties
to a value of your liking. The default is 168 - meaning that messages are not kept after 168 hours.
If you instead are looking for a way to read messages from a specific offset - i.e. not read from the beginning every time, look here http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.htmlcommit()
- committing read offsets to kafkaseek_to_end()
- fast forward to consuming only newly arriving messagesseek()
- moving to a given offset (presumably stored somewhere else than in kafka)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With