On spring boot 2.6.4, this method is deprecated.
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, consumerFactory());
// deprecated
factory.setErrorHandler(new GlobalErrorHandler());
return factory;
}
The global error handler class
public class GlobalErrorHandler implements ConsumerAwareErrorHandler {
private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer) {
// my custom global logic (e.g. notify ops team via slack)
}
}
What is the replacement sample for this? The doc says I should use setCommonErrorHandler, but how to implements the CommonErrorHandler interface, as no method to be overriden there.
Point is, I have to send slack notification to ops team, based on certain condition (the message tpye, which is available on kafka message header)
This is not blocking, just an annoying deprecated message though. Thanks
I was facing exactly the same problem, so I changed the method implementation ConsumerAwareErrorHandler by
CommonErrorHandler
and implemented
handleRecord
like described in the docs and it works!
public class GlobalErrorHandler implements CommonErrorHandler {
private static final Logger log = LoggerFactory.getLogger(GlobalErrorHandler.class);
@Override
public void handleRecord(
Exception thrownException,
ConsumerRecord<?, ?> record,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
log.warn("Global error handler for message: {}", record.value().toString());
}
}
In KafkaConfig.class
@Bean(value = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
var factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory());
factory.setCommonErrorHandler(new GlobalErrorHandler());
return factory;
}
See the Spring for Apache Kafka documentation; legacy error handlers are replaced with CommonErrorHandler implementations.
What's New?
https://docs.spring.io/spring-kafka/docs/current/reference/html/#x28-eh
The legacy GenericErrorHandler and its sub-interface hierarchies for record an batch listeners have been replaced by a new single interface CommonErrorHandler with implementations corresponding to most legacy implementations of GenericErrorHandler. See Container Error Handlers for more information.
Container Error Handlers
https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers
Starting with version 2.8, the legacy
ErrorHandlerandBatchErrorHandlerinterfaces have been superseded by a newCommonErrorHandler. These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener. CommonErrorHandler implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated. The legacy interfaces are still supported by listener containers and listener container factories; they will be deprecated in a future release.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With