Our requirement is such that if a kafka-stream app is consuming a partition, it should start it's consumption from latest offset of that partition.
This seems like do-able using
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
Now, let's say using above configuration, the kafka-stream app started consuming data from latest offset for a partition. And after some time, the app crashes. When the app comes back live, we want it to consume data from the latest offset of that partition, instead of the where it left last reading.
But I can't find anything that can help achieve it using kafka-streams api.
P.S. We are using kafka-1.0.0.
That is not supported out-of-box.
Configuration auto.offset.reset
only triggers, if there are no committed offsets and there is no config to change this behavior.
You could manipulate offsets manually before startup
using bin/kafka-consumer-groups.sh
though—the application.id
is the
group.id
and and you could "seek to end" before you restart the application.
Update:
Since 1.1.0 release, you can use bin/kafka-streams-application-reset.sh
tool to set starting offsets. To use the tool, the application must be offline. (cf: https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With