I have a spring boot application in which I have single Kafka Consumer.
I am using a DefaultKafkaConsumerFactory with default Consumer Configurations. I have a ConcurrentListenerContainerFactory with concurrency set to 1, and I have a method annotated with @KafkaListener.
I am listening to a topic with 3 partitions and I have 3 of such consumers deployed each in different applications. Hence, each consumer is listening to one partition.
Lets say poll on the consumer is called under the hood and 40 records are fetched. Then is each record, provided to the method annotated with @KafkaListener serially i.e. record 1 provided, wait till method finishes processing, record 2 provided , wait till method finishes processing and so on. Does the above happen, or for every record obtained , a separate thread is created and the method invocation happens on a separate thread, so the main thread does not block and it can poll for records more quickly.
I would also like more clarity on what a message listener container is and the eventual message listener.
Thank you in advance.
In 1.3 and above there is a single thread per consumer; the next poll()
is performed after the last message from the previous poll has been processed by the listener.
In earlier versions, there were two threads and a second (and possibly third) poll was performed while the listener thread is processing the first batch. This was required to avoid a rebalance due to a slow listener. The threading model was very complicated and we had to pause/resume the consumer when necessary. KIP-62 fixed the rebalance problem so we were able to use the much simpler threading model in use today.
Well, that is exactly an Apache Kafka position - guarantee an order processing records from the same partition in the same thread. Therefore when you distribute your topic with 3 partitions between 3 instances, each of them will get its own partition and does the polling in a single thread.
The KafkaMessageListenerContainer
is an event-driven, self-controlling wrapper around KafkaConsumer
. It really calls poll()
in a while (isRunning()) {
loop, which is scheduled in a TaskExecutor
:
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
And it processes ConsumerRecords
calling listener:
private void invokeListener(final ConsumerRecords<K, V> records) {
if (this.isBatchListener) {
invokeBatchListener(records);
}
else {
invokeRecordListener(records);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With