Is there a way to commit manually with Kafka Stream?
Usually with using the KafkaConsumer
, I do something like below:
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ // process records } consumer.commitAsync(); }
Where I'm calling commit manually. I don't see a similar API for KStream
.
Method SummaryManually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.
By default, the consumer is configured to auto-commit offsets. Using auto-commit gives you “at least once” delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property.
Asynchronous Commits. Each call to the commit API results in an offset commit request being sent to the broker. Using the synchronous API, the consumer is blocked until that request returns successfully.
Commits are handled by Streams internally and fully automatic, and thus there is usually no reason to commit manually. Note, that Streams handles this differently than consumer auto-commit -- in fact, auto-commit is disabled for the internally used consumer and Streams manages commits "manually". The reason is, that commits can only happen at certain points during processing to ensure no data can get lost (there a many internal dependencies with regard to updating state and flushing results).
For more frequent commits, you can reduce commit interval via StreamsConfig
parameter commit.interval.ms
.
Nevertheless, manual commits are possible indirectly, via low-level Processor API. You can use the context
object that is provided via init()
method to call context#commit()
. Note, that this is only a "request to Streams" to commit as soon as possible -- it's not issuing a commit directly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With