Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Kafka batch error handler - DeSerialiser Error handling with manual commit

My service is stuck in infinite loop when trying to handle JSON deserializer error. My service is using manual_immediate acknowledge mode with auto offset reset as false. I am using acknowledge.acknowledge() commit batch records in main code but in batch error handler, I am not able to commit offset for invalid messages. I tried ConsumerAwareBatchErrorHandler & BatchErrorHandler but isAckAfterHandle() method or consumer.commitSync() are not working.

Issue1: Need to understand the process to acknowledge batch/commit offset. Issue2: I am getting data as null. I tried to read original message from data (which is null) or thrownexception but failed.

Can Someone please help me with process to commit offset and move to next batch? I am looking to insert failed messages in dead letter or error queue and move on to next batch of messages.

Code tried:

       @Bean
       public Map<String, Object> consumerConfigs() {
           Map<String, Object> props = new HashMap<>();
           props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
           props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
           props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMsConfig);
           props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
           props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
           props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
           props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
           return props;
       }
   
       @Bean
       public DefaultKafkaConsumerFactory consumerFactory() {
           return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer(LocationRecordDto.class));
       }
   
       @Bean(KAFKA_LISTENER)
       public ConcurrentKafkaListenerContainerFactory<String, MyDTO> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, MYDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(consumerFactory());
           factory.setBatchListener(true);
           factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
           factory.setBatchErrorHandler(new ConsumerAwareBatchErrorHandler() {
               @Override
               public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {
      
                   if (thrownException instanceof SerializationException){
                       String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                       String topics = s.split("-")[0];
                       int offset = Integer.valueOf(s.split("offset ")[1]);
                       int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
   
                       TopicPartition topicPartition = new TopicPartition(topics, partition);
                       consumer.seek(topicPartition, offset + 1);
                   }
                   //Code to push data in error queue
                   //consumer.commitSync();
               }
   
               @Override
               public boolean isAckAfterHandle() {
                   return true;
               }
           });
           return factory;
       }
like image 242
Vikas Bansal Avatar asked Dec 03 '25 10:12

Vikas Bansal


1 Answers

You have to deal with deserialization exceptions in the listener instead of the error handler and commit the batch offsets normally.

Or consider using the new RecoveringBatchErrorHandler instead.

like image 170
Gary Russell Avatar answered Dec 05 '25 00:12

Gary Russell



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!