Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to skip corrupt (non-serializable) messages in Spring Kafka Consumer?

This question is for Spring Kafka, related to Apache Kafka with High Level Consumer: Skip corrupted messages

Is there a way to configure Spring Kafka consumer to skip a record that cannot be read/processed (is corrupt)?

I am seeing a situation where the consumer gets stuck on the same record if it cannot be deserialized. This is the error the consumer throws.

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value 

The consumer polls the topic and just keeps printing the same error in a loop till program is killed.

In a @KafkaListener that has the following Consumer factory configurations,

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
like image 643
Shankar P S Avatar asked Nov 15 '18 21:11

Shankar P S


People also ask

How to get Kafka messages in realtime using Spring Boot?

We just use @KafkaListener annotation at method level and pass the kafka consumer topic names. Spring boot automatically binds this method to the kafka consumer instance. As soon as any message is published to those topics, this method receive them in realtime.

How to send messages to a Kafka topic using Kafka producer service?

Let’s create a KafkaProducerService interface and its implementation to send messages to a Kafka topic. We just autowire KafkaTemplate and use its send method to publish messages to the topic.

Why is Kafka so fast and scalable?

That’s one of the reasons Kafka is fast and scalable. Before the consumer can start consuming records from the Kafka topic, you have to configure the corresponding key and value deserializers in your application. Here is an example of the Kafka consumer configuration for the key and value serializers using Spring Boot and Spring Kafka:

How to configure Kafka producer and consumer topics in application specific property?

We have also created application specific property to configure Kafka producer and consumer topics:- Let’s create a KafkaProducerService interface and its implementation to send messages to a Kafka topic. We just autowire KafkaTemplate and use its send method to publish messages to the topic.


Video Answer


2 Answers

In case you are using older version of kafka, in a @KafkaListener set the following Consumer factory configurations.

 Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);

Here is the code for CustomDeserializer:

 import java.util.Map;
    import org.apache.kafka.common.serialization.Deserializer;
    import com.fasterxml.jackson.databind.ObjectMapper;
    public class CustomDeserializer implements Deserializer<Object>
    {
        @Override
        public void configure( Map<String, ?> configs, boolean isKey )
        {
        }

        @Override
        public Object deserialize( String topic, byte[] data )
        {
            ObjectMapper mapper = new ObjectMapper();
            Object object = null;
            try
            {
                object = mapper.readValue(data, Object.class);
            }
            catch ( Exception exception )
            {
                System.out.println("Error in deserializing bytes " + exception);
            }
            return object;
        }

        @Override
        public void close()
        {
        }
    }

Since I want my code to be generic enough to read any kind of json, object = mapper.readValue(data, Object.class); I am converting it to Object.class. And as we are catching exception here, it won't be retried once read.

like image 40
Kusum Avatar answered Sep 27 '22 21:09

Kusum


You need ErrorHandlingDeserializer: https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-deserializer

If you can't move to that 2.2 version, consider to implement your own and return null for those records which can't be deserialized properly.

The source code is here: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java

like image 132
Artem Bilan Avatar answered Sep 27 '22 20:09

Artem Bilan