Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

when to use RecoveryCallback vs KafkaListenerErrorHandler

Tags:

spring-kafka

I'm trying to understand when should i use org.springframework.retry.RecoveryCallback and org.springframework.kafka.listener.KafkaListenerErrorHandler?

As of today, I'm using a class (implements org.springframework.retry.RecoveryCallback) to log error message and send the message to DLT and it's working. For sending a message to DLT, I'm using Spring KafkaTemplate and then I came across KafkaListenerErrorHandler and DeadLetterPublishingRecoverer. Now, can you please suggest me, how should i use KafkaListenerErrorHandler and DeadLetterPublishingRecoverer? Can this replace the RecoveryCallback?

Here is my current kafkaListenerContainerFactory code

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(primaryConsumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(recoveryCallback);
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setConcurrency(1);  
factory.getContainerProperties().setMissingTopicsFatal(false);
return factory;   }
like image 837
Raj Avatar asked Mar 04 '23 06:03

Raj


1 Answers

If it's working as you want now, why change it?

There are several layers and you can choose which one to do the error handling, depending on your needs.

  • KafkaListenerErrorHandler would be invoked for each delivery attempt within the retry, so you typically won't use it with retry.
  • Retry RecoveryCallback is invoked after retries are exhausted (or immmediately if you have classified an exception as not retryable).
  • ErrorHandler - is in the container and is invoked if any listener throws an exception, not just @KafkaListeners.

With recent versions of the framework you can completely replace listener level retry with a SeekToCurrentErrorHandler configured with a DeadLetterPublishingRecoverer and a BackOff.

The DeadLetterPublishingRecoverer is intended for use in a container error handler since it needs the raw ConsumerRecord<?, ?>.

The KafkaListenerErrorHandler only has access to the spring-messaging Message<?> that is converted from the ConsumerRecord<?, ?>.

like image 149
Gary Russell Avatar answered Apr 13 '23 00:04

Gary Russell