I am using Avro and Schema registry with my Spring Kafka setup.
I would like to somehow handle the SerializationException
, which might be thrown during deserialization.
I found the following two resource:
https://github.com/spring-projects/spring-kafka/issues/164
How do I configure spring-kafka to ignore messages in the wrong format?
These resources suggest that I return null instead of throwing an SerializationException
when deserializing and listen for KafkaNull
. This solution works just fine.
I would however like to be able to throw an exception instead of returning null.
KIP-161 and KIP-210 provide better features to handling exceptions. I did find some resources mentioning KIP-161 in Spring Cloud, but nothing specific about Spring-Kafka.
Does anyone know how to catch SerializationException
in Spring Boot?
I am using Spring Boot 2.0.2
Edit: I found a solution.
I would rather throw an exception and catch it than having to return null or KafkaNull
. I am using my custom Avro serializer and deserializer in multiple different project, some of which are not Spring. If I changed my Avro serializer and deserializer then some of the other projects would need to be changed to expect the deserializer to return null.
I would like to shutdown the container, such that I do not lose any messages. The SerializationException should never be expected in production. The SerializationException should only be able to happen if Schema Registry is down or if an unformatted message somehow is sent to the production kafka. Either way, SerializationException should only happen very rarely, and if it happens then I want to shutdown the container such that no messages are lost and I can investigate the issue.
Just take into consideration that will catch all exceptions from your consumer container. In my specific case I just want to only shutdown if it is a SerializationException
public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
//Only call super if the exception is SerializationException
if (thrownException instanceof SerializationException) {
//This will shutdown the container.
super.handle(thrownException, records, consumer, container);
} else {
//Wrap and re-throw the exception
throw new KafkaException("Kafka Consumer Container Error", thrownException);
}
}
}
This handler is passed to the consumer container. Below is an example of a
KafkaListenerContainerFactory
bean.
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setErrorHandler(new SerializationExceptionHandler());
factory.getContainerProperties().setTransactionManager(chainedTxM(jpa, kafka));
return factory;
}
The consumer is not able to handle the poison pill. The consumption of the topic partition is blocked because the consumer offset is not moving forward. The consumer will try again and again (very rapidly) to deserialize the record but will never succeed.
SerializationException(String, Exception) Initializes a new instance of the SerializationException class with a specified error message and a reference to the inner exception that is the cause of this exception.
The ErrorHandlingDeserializer2 is really for deserialization problem. Those failed records don't reach your listener at all. For listener side we provide a KafkaListenerErrorHandler and that one really can deal with retries. See more in Docs: docs.spring.io/spring-kafka/docs/2.3.1.RELEASE/reference/html/…
There is nothing Spring can do; the deserialization occurs before the consumer gets any data. You need to enhance the deserializer.
I would however like to be able to throw an exception instead of returning null.
That won't help anything since Kafka won't know how to deal with the exception. Again; this all happens before the data is available so returning null (or some other special value) is the best technique.
EDIT
In 2.2, we added an error handling deserializer which delegates to the actual deserializer and returns null, with the exception in a header; the listener container then passes this directly to the error handler instead of the listener.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With