Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer exception and offset commits

I've been trying to do some POC work for Spring Kafka. Specifically, I wanted to experiment with what are the best practices in terms of dealing with errors while consuming messages within Kafka.

I am wondering if anyone is able to help with:

  1. Sharing best practices surrounding what Kafka consumers should do when there is a failure
  2. Help me understand how AckMode Record works, and how to prevent commits to the Kafka offset queue when an exception is thrown in the listener method.

The code example for 2 is given below:

Given that AckMode is set to RECORD, which according to the documentation:

commit the offset when the listener returns after processing the record.

I would have thought the the offset would not be incremented if the listener method threw an exception. However, this was not the case when I tested it using the code/config/command combination below. The offset still gets updated, and the next message continues to be processed.

My config:

    private Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
}

   @Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
    return factory;
}

My code:

@Component
public class KafkaMessageListener{
    @KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
    public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
            throw new RuntimeException("Oops!");
    }

Command to verify offset:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group

I'm using kafka_2.12-0.10.2.0 and org.springframework.kafka:spring-kafka:1.1.3.RELEASE

like image 270
yfl Avatar asked Mar 29 '17 05:03

yfl


People also ask

What happens if Kafka consumer throws exception?

The messages which throw exceptions, have their retry count updated until the maximum retry count is reached. Messages which reach the maximum retry count are not processed again. Such messages can also be sent to a Dead Letter Queue Topic for manual research and handling.

How does a consumer commit offsets in Kafka?

The Kafka consumer commits the offset periodically when polling batches, as described above. This strategy works well if the message processing is synchronous and failures handled gracefully. Be aware that starting Quarkus 1.9, auto commit is disabled by default. So you need to explicitly enable it.

Which consumer in Kafka will commit the current offset?

By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka.

How do you manually commit offset in Kafka?

Method Summary Manually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.


1 Answers

The container (via ContainerProperties) has a property, ackOnError which is true by default...

/**
 * Set whether or not the container should commit offsets (ack messages) where the
 * listener throws exceptions. This works in conjunction with {@link #ackMode} and is
 * effective only when the kafka property {@code enable.auto.commit} is {@code false};
 * it is not applicable to manual ack modes. When this property is set to {@code true}
 * (the default), all messages handled will have their offset committed. When set to
 * {@code false}, offsets will be committed only for successfully handled messages.
 * Manual acks will be always be applied. Bear in mind that, if the next message is
 * successfully handled, its offset will be committed, effectively committing the
 * offset of the failed message anyway, so this option has limited applicability.
 * Perhaps useful for a component that starts throwing exceptions consistently;
 * allowing it to resume when restarted from the last successfully processed message.
 * @param ackOnError whether the container should acknowledge messages that throw
 * exceptions.
 */
public void setAckOnError(boolean ackOnError) {
    this.ackOnError = ackOnError;
}

Bear in mind, though, that if the next message is successful, its offset will be committed anyway, which effectively commits the failed offset too.

EDIT

Starting with version 2.3, ackOnError is now false by default.

like image 153
Gary Russell Avatar answered Sep 28 '22 08:09

Gary Russell