I'm trying to consume data from kafka server, I'm using @Kafkalistener
annotation.
The problem is that I'm getting all the messages every time I restart my application. How do I save the last consumed offset in my application and use it to consume next messages?
How to change consumer offset? Use the kafka-consumer-groups.sh to change or reset the offset. You would have to specify the topic, consumer group and use the –reset-offsets flag to change the offset.
Committing manually You can disable auto-commit by setting enable. auto. commit=false and then commit manually by calling either commitSync() or commitAsync() , depending on your use-case.
Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.
OFFSET IN KAFKA The offset is a unique id assigned to the partitions, which contains messages. The most important use is that it identifies the messages through id, which are available in the partitions. In other words, it is a position within a partition for the next message to be sent to a consumer.
This tutorial has the answer: https://www.codenotfound.com/spring-kafka-boot-example.html
The kafka.consumer.auto-offset-reset property needs to be set to 'earliest' which ensures the new consumer group will get the message sent in case the container started after the send was completed.
So you should set it to latest as per your requirement
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With