Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka-python - How do I commit a partition?

Using kafka-python-1.0.2.

If I have a topic with 10 partitions, how do I go about committing a particular partition, while looping through the various partitions and messages. I just cant seem find an example of this anywhere, in the docs or otherwise

From the docs, I want to use:

consumer.commit(offset=offsets)

Specifically, how do I create the partition and OffsetAndMetadata dictionary required for offsets (dict, optional) – {TopicPartition: OffsetAndMetadata}.

I was hoping the function call would just be something like:

consumer.commit(partition, offset)

but this does not seem to be the case.

Thanks in advance.

like image 807
Johnny Gasyna Avatar asked Apr 12 '16 17:04

Johnny Gasyna


1 Answers

Just need to call consumer.commit()

from kafka import KafkaConsumer

KAFKA_TOPIC_NAME='KAFKA_TOPIC_NAME'
KAFKA_CONSUMER_GROUP='KAFKA_CONSUMER_GROUP'
consumer = KafkaConsumer(
    KAFKA_TOPIC_NAME,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    group_id=KAFKA_CONSUMER_GROUP
)
for message in consumer:
    print(message.value)
    consumer.commit()    # <--- This is what we need
    # Optionally, To check if everything went good
    from kafka import TopicPartition
    print('New Kafka offset: %s' % consumer.committed(TopicPartition(KAFKA_TOPIC_NAME, message.partition)))
like image 73
akki Avatar answered Sep 27 '22 19:09

akki