Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka - How to commit offset after every message using High-Level consumer?

I'm using Kafka's high-level consumer. Because I'm using Kafka as a 'queue of transactions' for my application, I need to make absolutely sure I don't miss or re-read any messages. I have 2 questions regarding this:

  1. How do I commit the offset to zookeeper? I will turn off auto-commit and commit offset after every message successfully consumed. I can't seem to find actual code examples of how to do this using high-level consumer. Can anyone help me with this?

  2. On the other hand, I've heard committing to zookeeper might be slow, so another way may be to locally keep track of the offsets? Is this alternative method advisable? If yes, how would you approach it?

like image 832
Hongyi Li Avatar asked Aug 13 '14 18:08

Hongyi Li


People also ask

Which consumer in Kafka will commit the current offset?

By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka.

How does a consumer commit offsets in Kafka?

The Kafka consumer commits the offset periodically when polling batches, as described above. This strategy works well if the message processing is synchronous and failures handled gracefully. Be aware that starting Quarkus 1.9, auto commit is disabled by default. So you need to explicitly enable it.

How do you manually commit specific offset in Kafka?

Method Summary Manually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.

How does Kafka store offsets for each topic?

Kafka store the offset commits in a topic, when consumer commit the offset, kafka publish an commit offset message to an "commit-log" topic and keep an in-memory structure that mapped group/topic/partition to the latest offset for fast retrieval.


2 Answers

You could first disable auto commit: auto.commit.enable=false

Then commit after fetching the message: consumer.commitOffsets(true)

like image 175
Yueheng Li Avatar answered Sep 21 '22 10:09

Yueheng Li


There are two relevant settings from http://kafka.apache.org/documentation.html#consumerconfigs.

auto.commit.enable 

and

auto.commit.interval.ms 

If you want to set it such that the consumer commits the offset after each message, that will be difficult since the only setting is after a timer interval, not after each message. You will have to do some rate prediction of the incoming messages and accordingly set the time.

In general, it is not recommended to keep this interval too small because it vastly increases the read/write rates in zookeeper and zookeeper gets slowed down because it's strongly consistent across its quorum.

like image 24
laughing_man Avatar answered Sep 20 '22 10:09

laughing_man