Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to handle SerializationException after deserialization

I am using Avro and Schema registry with my Spring Kafka setup.

I would like to somehow handle the SerializationException, which might be thrown during deserialization.

I found the following two resource:

https://github.com/spring-projects/spring-kafka/issues/164

How do I configure spring-kafka to ignore messages in the wrong format?

These resources suggest that I return null instead of throwing an SerializationException when deserializing and listen for KafkaNull. This solution works just fine.

I would however like to be able to throw an exception instead of returning null.

KIP-161 and KIP-210 provide better features to handling exceptions. I did find some resources mentioning KIP-161 in Spring Cloud, but nothing specific about Spring-Kafka.

Does anyone know how to catch SerializationException in Spring Boot?

I am using Spring Boot 2.0.2

Edit: I found a solution.

I would rather throw an exception and catch it than having to return null or KafkaNull. I am using my custom Avro serializer and deserializer in multiple different project, some of which are not Spring. If I changed my Avro serializer and deserializer then some of the other projects would need to be changed to expect the deserializer to return null.

I would like to shutdown the container, such that I do not lose any messages. The SerializationException should never be expected in production. The SerializationException should only be able to happen if Schema Registry is down or if an unformatted message somehow is sent to the production kafka. Either way, SerializationException should only happen very rarely, and if it happens then I want to shutdown the container such that no messages are lost and I can investigate the issue.

Just take into consideration that will catch all exceptions from your consumer container. In my specific case I just want to only shutdown if it is a SerializationException

public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                       MessageListenerContainer container) {

        //Only call super if the exception is SerializationException
        if (thrownException instanceof SerializationException) {
            //This will shutdown the container.
            super.handle(thrownException, records, consumer, container);
        } else {
            //Wrap and re-throw the exception
            throw new KafkaException("Kafka Consumer Container Error", thrownException);
        }
    }
}

This handler is passed to the consumer container. Below is an example of a KafkaListenerContainerFactory bean.

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {

    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.getContainerProperties().setPollTimeout(3000);

    factory.getContainerProperties().setErrorHandler(new SerializationExceptionHandler());

    factory.getContainerProperties().setTransactionManager(chainedTxM(jpa, kafka));
    return factory;
}
like image 940
kkflf Avatar asked Jul 02 '18 13:07

kkflf


People also ask

Can your Kafka consumers handle a poison pill?

The consumer is not able to handle the poison pill. The consumption of the topic partition is blocked because the consumer offset is not moving forward. The consumer will try again and again (very rapidly) to deserialize the record but will never succeed.

What is SerializationException?

SerializationException(String, Exception) Initializes a new instance of the SerializationException class with a specified error message and a reference to the inner exception that is the cause of this exception.

What is ErrorHandlingDeserializer2?

The ErrorHandlingDeserializer2 is really for deserialization problem. Those failed records don't reach your listener at all. For listener side we provide a KafkaListenerErrorHandler and that one really can deal with retries. See more in Docs: docs.spring.io/spring-kafka/docs/2.3.1.RELEASE/reference/html/…


1 Answers

There is nothing Spring can do; the deserialization occurs before the consumer gets any data. You need to enhance the deserializer.

I would however like to be able to throw an exception instead of returning null.

That won't help anything since Kafka won't know how to deal with the exception. Again; this all happens before the data is available so returning null (or some other special value) is the best technique.

EDIT

In 2.2, we added an error handling deserializer which delegates to the actual deserializer and returns null, with the exception in a header; the listener container then passes this directly to the error handler instead of the listener.

like image 67
Gary Russell Avatar answered Oct 14 '22 05:10

Gary Russell