Using kafka-python-1.0.2.
If I have a topic with 10 partitions, how do I go about committing a particular partition, while looping through the various partitions and messages. I just cant seem find an example of this anywhere, in the docs or otherwise
From the docs, I want to use:
consumer.commit(offset=offsets)
Specifically, how do I create the partition and OffsetAndMetadata dictionary required for offsets (dict, optional) – {TopicPartition: OffsetAndMetadata}.
I was hoping the function call would just be something like:
consumer.commit(partition, offset)
but this does not seem to be the case.
Thanks in advance.
Just need to call consumer.commit()
from kafka import KafkaConsumer
KAFKA_TOPIC_NAME='KAFKA_TOPIC_NAME'
KAFKA_CONSUMER_GROUP='KAFKA_CONSUMER_GROUP'
consumer = KafkaConsumer(
KAFKA_TOPIC_NAME,
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=KAFKA_CONSUMER_GROUP
)
for message in consumer:
print(message.value)
consumer.commit() # <--- This is what we need
# Optionally, To check if everything went good
from kafka import TopicPartition
print('New Kafka offset: %s' % consumer.committed(TopicPartition(KAFKA_TOPIC_NAME, message.partition)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With