Kafka consumer has processed the messages 1, 2, 3, 4 and the enable.auto.commit is set to false.
But on restarting the consumer, it is not reprocessing the above messages again, from CLI I could see that the offset has been incremented and there is no lag (hence it is committing).
Can you please help on this, to understand how the consumer is still committing the offsets though the property enable.auto.commit is set to false.
Below are the consumer properties
allow.auto.create.topics = true
auto.commit.interval.ms = 0
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
enable.auto.commit = false
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = EmployeeConsumer
Currently using the spring-kafka-2.5.0.RELEASE.jar as the dependency
By default, the consumer is configured to auto-commit offsets. Using auto-commit gives you “at least once” delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property.
The Kafka default behavior If the application fails to process a message, it throws an exception, which either interrupts the while loop or is handled gracefully (within the processRetrievedRecords method). In the first case, it means that it won't commit anymore (as it happens in the poll method, not called anymore).
Kafka Consumer Offsets As we know, each message in a Kafka topic has a partition ID and an offset ID attached to it. Therefore, in order to "checkpoint" how far a consumer has been reading into a topic partition, the consumer will regularly commit the latest processed message, also known as consumer offset.
Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.
You need to show your Spring configuration.
enable.auto.commit=false
tells the kafka-clients not to commit offsets, but Spring will commit offsets by default.
Set the listener container ackMode
property to AckMode.MANUAL
to disable the container commits.
Furthermore:
auto.offset.reset = latest
means that a consumer that has never committed an offset will start consuming from the current end of the topic/partition so it won't get existing records.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With