Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Kafka Auto Commit Offset In Case of Failures

I am using Spring Kafka 1.2.2.RELEASE. I have a Kafka Listener as consumer that listens to a topic and index the document in elastic. My Auto commit offset property is set to true //default.

I was under the impression that in case there is an exception in the listener(elastic is down) the offsets should not be committed and the same message should be processed for the next poll

However this is not happening and the consumer commits the offset on the next poll.After reading posts and documentation i learnt that this is the case that with auto commit set to true to next poll will commit all offset

My doubt is why is the consumer calling the next poll and also how can i prevent any offset from committing with auto commit to true or do i need to set this property to false and commit manually.

like image 307
rishi Avatar asked Mar 21 '18 10:03

rishi


People also ask

How Auto Commit works in Kafka?

By default, the consumer is configured to auto-commit offsets. Using auto-commit gives you “at least once” delivery: Kafka guarantees that no messages will be missed, but duplicates are possible. Auto-commit basically works as a cron with a period set through the auto.commit.interval.ms configuration property.

Does Kafka producer commit offset?

It commits the offset, indicating that all the previous records from that partition have been processed. So, if a consumer stops and comes back later, it restarts from the last committed position (if assigned to that partition again).

What is Auto Commit interval in Kafka?

Auto commit is enabled out of the box and by default commits every five seconds. For a simple data transformation service, “processed” means, simply, that a message has come in and been transformed and then produced back to Kafka.


2 Answers

I prefer to set it to false; it is more reliable for the container to manage the offsets for you.

Set the container's AckMode to RECORD (it defaults to BATCH) and the container will commit the offset for you after the listener returns.

Also consider upgrading to at least 1.3.3 (current version is 2.1.4); 1.3.x introduced a much simpler threading model, thanks to KIP-62

EDIT

With auto-commit, the offset will be committed regardless of success/failure. The container won't commit after a failure, unless ackOnError is true (another reason to not use auto commit).

However, that still won't help because the broker won't send the same record again. You have to perform a seek operation on the Consumer for that.

In 2.0.1 (current version is 2.1.4), we added the SeekToCurrentErrorHandler which will cause the failed and unprocessed records to be re-sent on the next poll. See the reference manual.

You can also use a ConsumerAwareListener to perform the seek yourself (also added in 2.0).

With older versions (>= 1.1) you have to use a ConsumerSeekAware listener which is quite a bit more complicated.

Another alternative is to add retry so the delivery will be re-attempted according to the retry settings.

like image 172
Gary Russell Avatar answered Oct 10 '22 16:10

Gary Russell


Apparently, there will be message loss with Spring Kafka <=1.3.3 @KafkaListener even if you use ackOnError=false if we expect Spring Kafka to automatically (at least document) take care of this by retrying and "simply not doing poll again"? :). And, the default behavior is to just log.

We were able to reproduce message loss/skip on a consumer even with spring-kafka 1.3.3.RELEASE (no maven sources) and with a single partition topic, concurrency(1), AckOnError(false), BatchListener(true) with AckMode(BATCH) for any runtime exceptions. We ended up doing retries inside the template or explore ConsumerSeekAware.

@GaryRussell, regarding "broker won't send same record again" or continue returning next batch of messages without commit?, is this because, consumer poll is based on current offset that it seeked to get next batch of records and not exactly on last offsets committed? Basically, consumers need not commit at all assuming some run time exceptions on every processing and keep consuming entire messages on topic. Only a restart will start from last committed offset (duplicate).

Upgrade to 2.0+ to use ConsumerAwareListenerErrorHandler seems requires upgrading to at least Spring 5.x which is a major upgrade.

like image 39
kisna Avatar answered Oct 10 '22 16:10

kisna