Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Acknowledge within @KafkaListener-method without "losing" messages

we currently acknowledge messages basically with the following simplified mechanism:

  @KafkaListener(topics = "someTopic")
  public void listen(final String message, final Acknowledgment ack) {
    try {
        processMessage(message);
        ack.acknowledge();
    } catch (final IOException e) {
        // do not acknowledge here since we can temporarily not process the message
    } 

Basically, whenever we can not process the message temporarily (in cases of IOExceptions) we want to receive it at a later time again.

But this is not working since acknowledge assumes that all previous messages within the same partition were successfully processed. And in our IOException case the failing message would get skipped, but potentially be acknowledged by a different message with a higher index on the same partition.

We have some ideas how to fix this, but this would mean some nasty workaround to avoid calling acknowledge directly within the KafkaListener method. Is our use case a very specific one or isn't it more like the "default" behavior that spring kafka users would assume?

Is there a spring-kafka solution for this kind of problem? Or do you have an idea to solve this "correctly"?

like image 668
Carsten Gubernator Avatar asked Jan 04 '17 10:01

Carsten Gubernator


1 Answers

That's just the way kafka works; you could enable retry to try to get past your IOException before moving on to the next message.

You can configure an error handle to publish to failing message to another topic to replay later. Or, the error handler can stop the container to prevent any new deliveries.

When the container is restarted the message will be replayed.

like image 172
Gary Russell Avatar answered Oct 11 '22 04:10

Gary Russell