I am using Kafka with Avro for serialization/de-serialization for processing events. If by chance a bad data that doesnt comply to the avro schema gets in the topic,
.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer app: host: dcId: envId: url: reqId: jsess: secSessId: logUser: effUser: impUser: channelName: - Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition EventProcessor-0 at offset 2845. If needed, please seek past the record to continue consumption.
and the message keeps growing for the same offset. Is there a possibility to just skip this offset and continue to read from the further offsets and if the same happens again, skip that too ?
Consumer code:
@KafkaListener(topics = "EventProcessor", containerFactory = "eventProcessorListenerContainerFactory")
public void listen(Event payLoad) {
System.out.println("REceived message ===> " + payLoad);
}
Factory :
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Event>> eventProcessorListenerContainerFactory() {
Map<String, Object> propMap = new HashMap<String, Object>();
propMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
propMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propMap.put(ConsumerConfig.GROUP_ID_CONFIG, "EventProcessorConsumer");
DefaultKafkaConsumerFactory<String, Event> consuemrFactory = new DefaultKafkaConsumerFactory<String, Event>(
propMap);
consuemrFactory.setValueDeserializer(new AvroDeSerializer<Event>(
Event.class));
ConcurrentKafkaListenerContainerFactory<String, Event> listenerFactory = new ConcurrentKafkaListenerContainerFactory<>();
listenerFactory.setConsumerFactory(consuemrFactory);
listenerFactory.setConcurrency(3);
listenerFactory.setRetryTemplate(retryTemplate());
listenerFactory.getContainerProperties().setPollTimeout(3000);
return listenerFactory;
}
Try to adjust your policy as suggested by @Poppy
SimpleRetryPolicy policy = new SimpleRetryPolicy();
// Set the max retry attempts
policy.setMaxAttempts(5);
// Retry on all exceptions (this is the default)
policy.setRetryableExceptions(new Class[] {Exception.class});
// ... but never retry SerializationException
policy.setFatalExceptions(new Class[] {SerializationException.class}); //<-- here
// Use the policy...
RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(policy);
template.execute(new RetryCallback<Foo>() {
public Foo doWithRetry(RetryContext context) {
// business logic here
}
});
From here: https://docs.spring.io/spring-batch/3.0.x/reference/html/retry.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With