I have Spring Kafka listener initialized as
@Bean
public Map<String, Object> consumerConfig() {
final HashMap<String, Object> result = new HashMap<>();
result.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
result.put(GROUP_ID_CONFIG, groupId);
result.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
result.put(VALUE_DESERIALIZER_CLASS_CONFIG, MyKafkaJacksonRulesExecutionResultsDeserializer.class);
return result;
}
@Bean
public ConsumerFactory<Long, MessageResult> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageResult> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Long, MessageResult> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(consumerFactory());
containerFactory.setConcurrency(KAFKA_LISTENER_THREADS_COUNT);
containerFactory.getContainerProperties().setPollTimeout(KAFKA_LISTENER_POLL_TIMEOUT);
containerFactory.getContainerProperties().setAckOnError(true);
containerFactory.getContainerProperties().setAckMode(RECORD);
return containerFactory;
}
and used as
@KafkaListener(topics = "${spring.kafka.out-topic}")
public void processSrpResults(MessageResult result) {
deserializer throws an exception during deserialization which causes infinite loop because listener can't fetch messege.
How could I make kafka listener to commit on error?
I created a subclass to the deserializer that was throwing the exception. Then I used that in my configuration as the deserializer. Your processor then has to handle null objects.
public class MyErrorHandlingDeserializer extends ExceptionThrowingDeserializer {
@Override
public Object deserialize(String topic, byte[] data) {
try {
return super.deserialize(topic, data);
} catch (Exception e) {
log.error("Problem deserializing data " + new String(data) + " on topic " + topic, e);
return null;
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With