My service is stuck in infinite loop when trying to handle JSON deserializer error. My service is using manual_immediate acknowledge mode with auto offset reset as false. I am using acknowledge.acknowledge() commit batch records in main code but in batch error handler, I am not able to commit offset for invalid messages. I tried ConsumerAwareBatchErrorHandler & BatchErrorHandler but isAckAfterHandle() method or consumer.commitSync() are not working.
Issue1: Need to understand the process to acknowledge batch/commit offset. Issue2: I am getting data as null. I tried to read original message from data (which is null) or thrownexception but failed.
Can Someone please help me with process to commit offset and move to next batch? I am looking to insert failed messages in dead letter or error queue and move on to next batch of messages.
Code tried:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMsConfig);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
return props;
}
@Bean
public DefaultKafkaConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer(LocationRecordDto.class));
}
@Bean(KAFKA_LISTENER)
public ConcurrentKafkaListenerContainerFactory<String, MyDTO> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MYDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchErrorHandler(new ConsumerAwareBatchErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {
if (thrownException instanceof SerializationException){
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
consumer.seek(topicPartition, offset + 1);
}
//Code to push data in error queue
//consumer.commitSync();
}
@Override
public boolean isAckAfterHandle() {
return true;
}
});
return factory;
}
You have to deal with deserialization exceptions in the listener instead of the error handler and commit the batch offsets normally.
Or consider using the new RecoveringBatchErrorHandler instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With