Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to commit manually with Kafka Stream?

Tags:

Is there a way to commit manually with Kafka Stream?

Usually with using the KafkaConsumer, I do something like below:

while (true) {     ConsumerRecords<String, String> records = consumer.poll(100);     for (ConsumerRecord<String, String> record : records){        // process records     }    consumer.commitAsync(); } 

Where I'm calling commit manually. I don't see a similar API for KStream.

like image 522
Glide Avatar asked Apr 14 '17 17:04

Glide


People also ask

How do I manually commit in Kafka?

Method SummaryManually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.

How does Kafka Auto Commit work?

By default, the consumer is configured to auto-commit offsets. Using auto-commit gives you “at least once” delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property.

What is commit async in Kafka?

Asynchronous Commits. Each call to the commit API results in an offset commit request being sent to the broker. Using the synchronous API, the consumer is blocked until that request returns successfully.


1 Answers

Commits are handled by Streams internally and fully automatic, and thus there is usually no reason to commit manually. Note, that Streams handles this differently than consumer auto-commit -- in fact, auto-commit is disabled for the internally used consumer and Streams manages commits "manually". The reason is, that commits can only happen at certain points during processing to ensure no data can get lost (there a many internal dependencies with regard to updating state and flushing results).

For more frequent commits, you can reduce commit interval via StreamsConfig parameter commit.interval.ms.

Nevertheless, manual commits are possible indirectly, via low-level Processor API. You can use the context object that is provided via init() method to call context#commit(). Note, that this is only a "request to Streams" to commit as soon as possible -- it's not issuing a commit directly.

like image 127
Matthias J. Sax Avatar answered Oct 06 '22 01:10

Matthias J. Sax