Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Consumer: Stop processing messages when exception was raised

I'm a bit confused about the poll() behaviour of (Spring) Kafka after/when stopping the ConcurrentMessageListenerContainer.

What I want to achieve: Stop the consumer after an exception was raised (for example message could not be saved to the database), do not commit offset, restart it after a given time and start processing again from the previously failed message.

I read this article which says that the container will call the listener with the remaining records from the poll (https://github.com/spring-projects/spring-kafka/issues/451) which means that there is no guarantee that after the failed message a further message which was processed successfully will commit the offset. This could end up in lost/skipped messages.

Is this really the case and if yes is there a solution to solve this without upgrading the newer versions? (DLQ is not a solution for my case)

What I already did: Setting the setErrorHandler() and setAckOnError(false)

private Map<String, Object> getConsumerProps(CustomKafkaProps kafkaProps,  Class keyDeserializer) {
    Map<String, Object> props = new HashMap<>();
    //Set common props
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProps.getBootstrapServers());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProps.getConsumerGroupId());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Start with the first message when a new consumer group (app) arrives at the topic
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // We will use "RECORD" AckMode in the Spring Listener Container

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);

    if (kafkaProps.isSslEnabled()) {
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        props.put("ssl.keystore.location", kafkaProps.getKafkaKeystoreLocation());
        props.put("ssl.keystore.password", kafkaProps.getKafkaKeystorePassword());
        props.put("ssl.key.password", kafkaProps.getKafkaKeyPassword());
    }

    return props;
}

Consumer

public ConcurrentMessageListenerContainer<String, byte[]> kafkaReceiverContainer(CustomKafkaProps kafkaProps) throws Exception {
    StoppingErrorHandler stoppingErrorHandler = new StoppingErrorHandler();

    ContainerProperties containerProperties = new ContainerProperties(...);
    containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    containerProperties.setAckOnError(false);
    containerProperties.setErrorHandler(stoppingErrorHandler);

    ConcurrentMessageListenerContainer<String, byte[]> container = ...
    container.setConcurrency(1); //use only one container
    stoppingErrorHandler.setConcurrentMessageListenerContainer(container);

    return container;
}

Error Handler

public class StoppingErrorHandler implements ErrorHandler {

    @Setter
    private ConcurrentMessageListenerContainer concurrentMessageListenerContainer;

    @Value("${backends.kafka.consumer.halt.timeout}")
    int consumerHaltTimeout;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        if (concurrentMessageListenerContainer != null) {
            concurrentMessageListenerContainer.stop();
        }

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                if (concurrentMessageListenerContainer != null && !concurrentMessageListenerContainer.isRunning()) {
                    concurrentMessageListenerContainer.start();
                }
            }
        }, consumerHaltTimeout);
    }
}

What I'm using:

  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-kafka</artifactId>
  <version>2.1.2.RELEASE</version>

  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.1.7.RELEASE</version>
like image 531
amaroqz Avatar asked Feb 20 '18 13:02

amaroqz


People also ask

What happens if Kafka consumer throws exception?

The consumer for that topic will retry processing the message. Messages that throw exceptions again can be inserted into a new topic (e.g. retry_2) and processed again, and so on. After the maximum number of retries is reached, the message can be sent to a Dead Letter Queue topic for manual research and handling.

How do you stop the Kafka consumer?

Stopping a Kafka Consumer We can use rest api to stop a running consumer. However, we need consumer id to stop the running consumer, so the consumer id needs to be sent. Then try to access the POST http://localhost:8080/api/kafka/registry/deactivate by sending the id parameter of the consumer you want to stop.

Can your Kafka consumers handle a poison pill?

The consumer is not able to handle the poison pill. The consumption of the topic partition is blocked because the consumer offset is not moving forward. The consumer will try again and again (very rapidly) to deserialize the record but will never succeed.

Can we pause Kafka consumer?

Pausing Kafka ConsumersThe Kafka Consumer client has the functionality to fetch assigned partitions, pause specific partitions, fetch currently paused partitions, and resume specific partitions. These methods are all you need to pause and resume consumption dynamically.


1 Answers

without upgrading the newer versions?

2.1 introduced the ContainerStoppingErrorHandler which is a ContainerAwareErrorHandler, the remaining unconsumed messages are discarded (and will be re-fetched when the container is restarted).

With earlier versions, your listener will need to reject (fail) the remaining messages in the batch (or set max.records.per.poll=1).

like image 157
Gary Russell Avatar answered Sep 24 '22 00:09

Gary Russell