we currently acknowledge messages basically with the following simplified mechanism:
@KafkaListener(topics = "someTopic")
public void listen(final String message, final Acknowledgment ack) {
try {
processMessage(message);
ack.acknowledge();
} catch (final IOException e) {
// do not acknowledge here since we can temporarily not process the message
}
Basically, whenever we can not process the message temporarily (in cases of IOExceptions) we want to receive it at a later time again.
But this is not working since acknowledge assumes that all previous messages within the same partition were successfully processed. And in our IOException case the failing message would get skipped, but potentially be acknowledged by a different message with a higher index on the same partition.
We have some ideas how to fix this, but this would mean some nasty workaround to avoid calling acknowledge directly within the KafkaListener method. Is our use case a very specific one or isn't it more like the "default" behavior that spring kafka users would assume?
Is there a spring-kafka solution for this kind of problem? Or do you have an idea to solve this "correctly"?
That's just the way kafka works; you could enable retry to try to get past your IOException before moving on to the next message.
You can configure an error handle to publish to failing message to another topic to replay later. Or, the error handler can stop the container to prevent any new deliveries.
When the container is restarted the message will be replayed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With