This question is for Spring Kafka, related to Apache Kafka with High Level Consumer: Skip corrupted messages
Is there a way to configure Spring Kafka consumer to skip a record that cannot be read/processed (is corrupt)?
I am seeing a situation where the consumer gets stuck on the same record if it cannot be deserialized. This is the error the consumer throws.
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value
The consumer polls the topic and just keeps printing the same error in a loop till program is killed.
In a @KafkaListener that has the following Consumer factory configurations,
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
We just use @KafkaListener annotation at method level and pass the kafka consumer topic names. Spring boot automatically binds this method to the kafka consumer instance. As soon as any message is published to those topics, this method receive them in realtime.
Let’s create a KafkaProducerService interface and its implementation to send messages to a Kafka topic. We just autowire KafkaTemplate and use its send method to publish messages to the topic.
That’s one of the reasons Kafka is fast and scalable. Before the consumer can start consuming records from the Kafka topic, you have to configure the corresponding key and value deserializers in your application. Here is an example of the Kafka consumer configuration for the key and value serializers using Spring Boot and Spring Kafka:
We have also created application specific property to configure Kafka producer and consumer topics:- Let’s create a KafkaProducerService interface and its implementation to send messages to a Kafka topic. We just autowire KafkaTemplate and use its send method to publish messages to the topic.
In case you are using older version of kafka, in a @KafkaListener set the following Consumer factory configurations.
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
Here is the code for CustomDeserializer:
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CustomDeserializer implements Deserializer<Object>
{
@Override
public void configure( Map<String, ?> configs, boolean isKey )
{
}
@Override
public Object deserialize( String topic, byte[] data )
{
ObjectMapper mapper = new ObjectMapper();
Object object = null;
try
{
object = mapper.readValue(data, Object.class);
}
catch ( Exception exception )
{
System.out.println("Error in deserializing bytes " + exception);
}
return object;
}
@Override
public void close()
{
}
}
Since I want my code to be generic enough to read any kind of json, object = mapper.readValue(data, Object.class); I am converting it to Object.class. And as we are catching exception here, it won't be retried once read.
You need ErrorHandlingDeserializer
: https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-handling-deserializer
If you can't move to that 2.2
version, consider to implement your own and return null
for those records which can't be deserialized properly.
The source code is here: https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With