I use Kafka and Spring Boot with Spring Kafka. After abnormal application termination and then restart, my application started receiving the old, already processed messages from Kafka queue.
What may be the reason for that and how to find and resolve the issue?
my Kafka properties:
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
My Spring Kafka factory and listener:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
return factory;
}
@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {
Post post = consumerRecord.value();
// do some logic here
ack.acknowledge();
}
When using Kafka, the clients need to commit offsets themselves. This is in contrast to other message brokers, such as AMQP brokers, where the broker keeps track of messages a client did already receive.
In your case, you do not commit offsets automatically and therefore Kafka expects you to commit them manually (because of this setting: spring.kafka.consumer.enable-auto-commit=false
). If you do not commit offsets manually in your program, the behaviour you describe is pretty much the expected one. Kafka simply does not know what messages your program did process successfully. Each time you restart your program, Kafka will see that your program did not commit any offsets yet and will apply the strategy you provide in spring.kafka.consumer.auto-offset-reset=earliest
, which means the first message in the queue.
If this is all new to you, I suggest reading up this documentation on Kafka and this Spring documentation, because Kafka is quite different than other message brokers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With