What is the best way to implement Dead letter queue (DLQ) concept in Spring Boot 2.0 application using spring-kafka 2.1.x to have all messages that were failed to be processed by @KafkaListener method of some bean sent to some predefined Kafka DLQ topic and not lose the single message?
So consumed Kafka record is either:
I tried to create listener container with the custom implementation of the ErrorHandler sending records failed to be processed to DLQ topic using KafkaTemplate. Using disabled auto-commit and RECORD AckMode.
spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ErrorHandler {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Value("${dlqTopic}")
private String dlqTopic;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
log.error("Error, sending to DLQ...");
kafkaTemplate.send(dlqTopic, record.key(), record.value());
}
}
It seems that this implementation doesn't guarantee item #3. If an exception will be thrown in DlqErrorHandler record will not be consumed by the listener once again.
Will usage of the transactional listener container help?
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
Is there any convenient way to implement DLQ concept using Spring Kafka?
UPDATE 2018/03/28
Thanks to Gary Russell's answer I was able to achieve the desired behavior by implementing DlqErrorHandler as follows
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
...
@Override
public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
Consumerrecord<?, ? record = records.get(0);
try {
kafkaTemplate.send("dlqTopic", record.key, record.value());
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
// Other records may be from other partitions, so seek to current offset for other partitions too
// ...
} catch (Exception e) {
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
// Other records may be from other partitions, so seek to current offset for other partitions too
// ...
throw new KafkaException("Seek to current after exception", thrownException);
}
}
}
This way if consumer poll returns 3 records (1, 2, 3) and the 2nd one can't be processed:
If sending to DLQ fails consumer seeks to the record.offset() and the record will be re-delivered to the listener (and sending to DLQ probably will be retired).
UPDATE 2021/04/30
Since Spring Kafka 2.7.0 non-blocking retries and dead letter topics are natively supported.
See the example: https://github.com/evgeniy-khist/spring-kafka-non-blocking-retries-and-dlt
Retries should usually be non-blocking (done in separate topics) and delayed:
A Dead Letter Queue in Kafka is independent of the framework you use. Some components provide out-of-the-box features for error handling and Dead Letter Queues. However, it is also easy to write your Dead Letter Queue logic for Kafka applications in any programming language like Java, Go, C++, Python, etc.
Next we need two topics; one regular topic for our incoming orders, and another outgoing dead letter topic (DLT) for any orders we fail to handle successfully.
To process messages on a dead-letter queue (DLQ), MQ supplies a default DLQ handler. The handler matches messages on the DLQ against entries in a rules table that you define. Messages can be put on a DLQ by queue managers, message channel agents (MCAs), and applications.
To address the problem of blocked batches, we set up a distinct retry queue using a separately defined Kafka topic. Under this paradigm, when a consumer handler returns a failed response for a given message after a certain number of retries, the consumer publishes that message to its corresponding retry topic.
See the SeekToCurrentErrorHandler
.
When an exception occurs, it seeks the consumer so that all unprocessed records are redelivered on the next poll.
You can use the same technique (e.g. a subclass) to write to the DLQ and seek the current offset (and other unprocessed) if the DLQ write fails, and seek just the remaining records if the DLQ write succeeds.
EDIT
The DeadLetterPublishingRecoverer
was added a few months after this answer was posted.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With