Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to always consume from latest offset in kafka-streams

Our requirement is such that if a kafka-stream app is consuming a partition, it should start it's consumption from latest offset of that partition.

This seems like do-able using

streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

Now, let's say using above configuration, the kafka-stream app started consuming data from latest offset for a partition. And after some time, the app crashes. When the app comes back live, we want it to consume data from the latest offset of that partition, instead of the where it left last reading.

But I can't find anything that can help achieve it using kafka-streams api.

P.S. We are using kafka-1.0.0.

like image 241
Saloni Vithalani Avatar asked Jan 19 '18 15:01

Saloni Vithalani


1 Answers

That is not supported out-of-box.

Configuration auto.offset.reset only triggers, if there are no committed offsets and there is no config to change this behavior.

You could manipulate offsets manually before startup using bin/kafka-consumer-groups.sh though—the application.id is the group.id and and you could "seek to end" before you restart the application.

Update:

Since 1.1.0 release, you can use bin/kafka-streams-application-reset.sh tool to set starting offsets. To use the tool, the application must be offline. (cf: https://cwiki.apache.org/confluence/display/KAFKA/KIP-171+-+Extend+Consumer+Group+Reset+Offset+for+Stream+Application)

like image 200
Matthias J. Sax Avatar answered Sep 19 '22 01:09

Matthias J. Sax