I've been trying to do some POC work for Spring Kafka. Specifically, I wanted to experiment with what are the best practices in terms of dealing with errors while consuming messages within Kafka.
I am wondering if anyone is able to help with:
The code example for 2 is given below:
Given that AckMode is set to RECORD, which according to the documentation:
commit the offset when the listener returns after processing the record.
I would have thought the the offset would not be incremented if the listener method threw an exception. However, this was not the case when I tested it using the code/config/command combination below. The offset still gets updated, and the next message continues to be processed.
My config:
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
return factory;
}
My code:
@Component
public class KafkaMessageListener{
@KafkaListener(topicPartitions = {@TopicPartition( topic = "my-replicated-topic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0", relativeToCurrent = "true"))})
public void onReplicatedTopicMessage(ConsumerRecord<Integer, String> data) throws InterruptedException {
throw new RuntimeException("Oops!");
}
Command to verify offset:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group
I'm using kafka_2.12-0.10.2.0 and org.springframework.kafka:spring-kafka:1.1.3.RELEASE
The messages which throw exceptions, have their retry count updated until the maximum retry count is reached. Messages which reach the maximum retry count are not processed again. Such messages can also be sent to a Dead Letter Queue Topic for manual research and handling.
The Kafka consumer commits the offset periodically when polling batches, as described above. This strategy works well if the message processing is synchronous and failures handled gracefully. Be aware that starting Quarkus 1.9, auto commit is disabled by default. So you need to explicitly enable it.
By default, as the consumer reads messages from Kafka, it will periodically commit its current offset (defined as the offset of the next message to be read) for the partitions it is reading from back to Kafka.
Method Summary Manually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.
The container (via ContainerProperties
) has a property, ackOnError
which is true by default...
/**
* Set whether or not the container should commit offsets (ack messages) where the
* listener throws exceptions. This works in conjunction with {@link #ackMode} and is
* effective only when the kafka property {@code enable.auto.commit} is {@code false};
* it is not applicable to manual ack modes. When this property is set to {@code true}
* (the default), all messages handled will have their offset committed. When set to
* {@code false}, offsets will be committed only for successfully handled messages.
* Manual acks will be always be applied. Bear in mind that, if the next message is
* successfully handled, its offset will be committed, effectively committing the
* offset of the failed message anyway, so this option has limited applicability.
* Perhaps useful for a component that starts throwing exceptions consistently;
* allowing it to resume when restarted from the last successfully processed message.
* @param ackOnError whether the container should acknowledge messages that throw
* exceptions.
*/
public void setAckOnError(boolean ackOnError) {
this.ackOnError = ackOnError;
}
Bear in mind, though, that if the next message is successful, its offset will be committed anyway, which effectively commits the failed offset too.
EDIT
Starting with version 2.3, ackOnError
is now false
by default.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With