Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring kafka setErrorHandler deprecated replacement (boot 2.6.4)

On spring boot 2.6.4, this method is deprecated.

public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, consumerFactory());

        // deprecated
        factory.setErrorHandler(new GlobalErrorHandler());

        return factory;
    }

The global error handler class

public class GlobalErrorHandler implements ConsumerAwareErrorHandler {

    private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
        // my custom global logic (e.g. notify ops team via slack)
    }

}

What is the replacement sample for this? The doc says I should use setCommonErrorHandler, but how to implements the CommonErrorHandler interface, as no method to be overriden there.

Point is, I have to send slack notification to ops team, based on certain condition (the message tpye, which is available on kafka message header)

This is not blocking, just an annoying deprecated message though. Thanks

like image 864
Timothy Avatar asked May 14 '26 11:05

Timothy


2 Answers

I was facing exactly the same problem, so I changed the method implementation ConsumerAwareErrorHandler by

CommonErrorHandler

and implemented

handleRecord

like described in the docs and it works!

public class GlobalErrorHandler implements CommonErrorHandler {

  private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);

  @Override
  public void handleRecord(
      Exception thrownException,
      ConsumerRecord<?, ?> record,
      Consumer<?, ?> consumer,
      MessageListenerContainer container) {
    log.warn("Global error handler for message: {}", record.value().toString());
  }
}

In KafkaConfig.class

  @Bean(value = "kafkaListenerContainerFactory")
  public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
    var factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, consumerFactory());

    factory.setCommonErrorHandler(new GlobalErrorHandler());

    return factory;
  }
like image 199
Matheus Nicolas Avatar answered May 17 '26 07:05

Matheus Nicolas


See the Spring for Apache Kafka documentation; legacy error handlers are replaced with CommonErrorHandler implementations.

What's New?

https://docs.spring.io/spring-kafka/docs/current/reference/html/#x28-eh

The legacy GenericErrorHandler and its sub-interface hierarchies for record an batch listeners have been replaced by a new single interface CommonErrorHandler with implementations corresponding to most legacy implementations of GenericErrorHandler. See Container Error Handlers for more information.

Container Error Handlers

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers

Starting with version 2.8, the legacy ErrorHandler and BatchErrorHandler interfaces have been superseded by a new CommonErrorHandler. These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener. CommonErrorHandler implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated. The legacy interfaces are still supported by listener containers and listener container factories; they will be deprecated in a future release.

like image 43
Gary Russell Avatar answered May 17 '26 05:05

Gary Russell