Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error handling - Consumer - apache kafka and spring

I am learning to use kafka, I have two services a producer and a consumer.

The producer produces messages that require processing (queries to services and database). These messages are received by the consumer, it is responsible for processing them and saves the result in a database

Producer

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
...
kafkaTemplate.send(topic, message);

Consumer

@KafkaListener(topics = "....")
public void listen(@Payload String message) {
....
}

I would like all messages to be processed correctly by the consumer. I do not know how to handle errors on the consumer side in this context. For example, a database might be temporarily disabled and could not handle certain messages.

What to do in these cases?

I know that the responsibility belongs to the consumer. I could do retries, but retry several times in a row if a database is down does not seem like a good idea. And if I continue to consume messages, the index advances and I lose the events that I could not process by mistake.

like image 940
user60108 Avatar asked Nov 19 '25 14:11

user60108


2 Answers

You have control over kafka consumer in form of committing the offset of records read. Kafka will continue to return the same records unless the offset is committed. You can set offset commit to manual and based on the success of your business logic decide whether to commit or not. See a sample below

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "false");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("foo", "bar"));
 final int minBatchSize = 200;
 List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
 while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
 }

Consumer.commitsync() commits the offset.

Also see the kakfa consumer documentation to understand the consumer offsets here .

like image 180
asolanki Avatar answered Nov 21 '25 08:11

asolanki


This link was very helpful https://dzone.com/articles/spring-for-apache-kafka-deep-dive-part-1-error-han

Spring provides the DeadLetterPublishingRecoverer class that performs a correct handling of errors.

like image 21
user60108 Avatar answered Nov 21 '25 09:11

user60108



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!