Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka - How to skip a bad message in an offset and consume the rest

I am using Kafka with Avro for serialization/de-serialization for processing events. If by chance a bad data that doesnt comply to the avro schema gets in the topic,

.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer app: host: dcId: envId: url: reqId: jsess: secSessId: logUser: effUser: impUser: channelName: - Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition EventProcessor-0 at offset 2845. If needed, please seek past the record to continue consumption.

and the message keeps growing for the same offset. Is there a possibility to just skip this offset and continue to read from the further offsets and if the same happens again, skip that too ?

Consumer code:

@KafkaListener(topics = "EventProcessor", containerFactory = "eventProcessorListenerContainerFactory")
    public void listen(Event payLoad) {

        System.out.println("REceived  message ===> " + payLoad);

    }

Factory :

@Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Event>> eventProcessorListenerContainerFactory() {

        Map<String, Object> propMap = new HashMap<String, Object>();
        propMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        propMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propMap.put(ConsumerConfig.GROUP_ID_CONFIG, "EventProcessorConsumer");

        DefaultKafkaConsumerFactory<String, Event> consuemrFactory = new DefaultKafkaConsumerFactory<String, Event>(
                propMap);

        consuemrFactory.setValueDeserializer(new AvroDeSerializer<Event>(
                Event.class));
        ConcurrentKafkaListenerContainerFactory<String, Event> listenerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        listenerFactory.setConsumerFactory(consuemrFactory);
        listenerFactory.setConcurrency(3);
        listenerFactory.setRetryTemplate(retryTemplate());
        listenerFactory.getContainerProperties().setPollTimeout(3000);
        return listenerFactory;
    }
like image 836
Poppy Avatar asked Oct 12 '25 04:10

Poppy


1 Answers

Try to adjust your policy as suggested by @Poppy

SimpleRetryPolicy policy = new SimpleRetryPolicy();
// Set the max retry attempts
policy.setMaxAttempts(5);
// Retry on all exceptions (this is the default)
policy.setRetryableExceptions(new Class[] {Exception.class});
// ... but never retry SerializationException
policy.setFatalExceptions(new Class[] {SerializationException.class}); //<-- here

// Use the policy...
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(policy);
template.execute(new RetryCallback<Foo>() {
    public Foo doWithRetry(RetryContext context) {
        // business logic here
    }
});

From here: https://docs.spring.io/spring-batch/3.0.x/reference/html/retry.html

like image 166
Balázs Németh Avatar answered Oct 14 '25 19:10

Balázs Németh