I'm a bit confused about the poll() behaviour of (Spring) Kafka after/when stopping the ConcurrentMessageListenerContainer
.
What I want to achieve: Stop the consumer after an exception was raised (for example message could not be saved to the database), do not commit offset, restart it after a given time and start processing again from the previously failed message.
I read this article which says that the container will call the listener with the remaining records from the poll (https://github.com/spring-projects/spring-kafka/issues/451) which means that there is no guarantee that after the failed message a further message which was processed successfully will commit the offset. This could end up in lost/skipped messages.
Is this really the case and if yes is there a solution to solve this without upgrading the newer versions? (DLQ is not a solution for my case)
What I already did:
Setting the setErrorHandler()
and setAckOnError(false)
private Map<String, Object> getConsumerProps(CustomKafkaProps kafkaProps, Class keyDeserializer) {
Map<String, Object> props = new HashMap<>();
//Set common props
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProps.getBootstrapServers());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProps.getConsumerGroupId());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start with the first message when a new consumer group (app) arrives at the topic
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // We will use "RECORD" AckMode in the Spring Listener Container
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
if (kafkaProps.isSslEnabled()) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put("ssl.keystore.location", kafkaProps.getKafkaKeystoreLocation());
props.put("ssl.keystore.password", kafkaProps.getKafkaKeystorePassword());
props.put("ssl.key.password", kafkaProps.getKafkaKeyPassword());
}
return props;
}
Consumer
public ConcurrentMessageListenerContainer<String, byte[]> kafkaReceiverContainer(CustomKafkaProps kafkaProps) throws Exception {
StoppingErrorHandler stoppingErrorHandler = new StoppingErrorHandler();
ContainerProperties containerProperties = new ContainerProperties(...);
containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
containerProperties.setAckOnError(false);
containerProperties.setErrorHandler(stoppingErrorHandler);
ConcurrentMessageListenerContainer<String, byte[]> container = ...
container.setConcurrency(1); //use only one container
stoppingErrorHandler.setConcurrentMessageListenerContainer(container);
return container;
}
Error Handler
public class StoppingErrorHandler implements ErrorHandler {
@Setter
private ConcurrentMessageListenerContainer concurrentMessageListenerContainer;
@Value("${backends.kafka.consumer.halt.timeout}")
int consumerHaltTimeout;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
if (concurrentMessageListenerContainer != null) {
concurrentMessageListenerContainer.stop();
}
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (concurrentMessageListenerContainer != null && !concurrentMessageListenerContainer.isRunning()) {
concurrentMessageListenerContainer.start();
}
}
}, consumerHaltTimeout);
}
}
What I'm using:
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.1.2.RELEASE</version>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.7.RELEASE</version>
The consumer for that topic will retry processing the message. Messages that throw exceptions again can be inserted into a new topic (e.g. retry_2) and processed again, and so on. After the maximum number of retries is reached, the message can be sent to a Dead Letter Queue topic for manual research and handling.
Stopping a Kafka Consumer We can use rest api to stop a running consumer. However, we need consumer id to stop the running consumer, so the consumer id needs to be sent. Then try to access the POST http://localhost:8080/api/kafka/registry/deactivate by sending the id parameter of the consumer you want to stop.
The consumer is not able to handle the poison pill. The consumption of the topic partition is blocked because the consumer offset is not moving forward. The consumer will try again and again (very rapidly) to deserialize the record but will never succeed.
Pausing Kafka ConsumersThe Kafka Consumer client has the functionality to fetch assigned partitions, pause specific partitions, fetch currently paused partitions, and resume specific partitions. These methods are all you need to pause and resume consumption dynamically.
without upgrading the newer versions?
2.1 introduced the ContainerStoppingErrorHandler which is a ContainerAwareErrorHandler
, the remaining unconsumed messages are discarded (and will be re-fetched when the container is restarted).
With earlier versions, your listener will need to reject (fail) the remaining messages in the batch (or set max.records.per.poll=1
).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With