I am working with spring boot + spring @KafkaListener. And the behavior I expect is: my kafka listener reads messages in 10 threads. So that, if one of threads hangs, other messages are would continue reading and handling messages.
I defined bean of
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory)
{
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
return factory;
}
And spring boot config:
spring.kafka.listener.concurrency=10
I see that all configs work, I see my 10 threads in jmx:
But then I make such test:
@KafkaListener(topics = {
"${topic.name}" }, clientIdPrefix = "${kafka.client.id.prefix}", idIsGroup = false, id = "${kafka.listener.name}", containerFactory = "kafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record)
{
if(record.getVersion() < 3) {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else
System.out.println("It works!");
}
If version is < 3, then hang, otherwise - work. I send 3 messages with version 1,2 and 3. I expect that messages with version 1 and 2 will hang, but version 3 will be processed at the time it comes to listener. But unfortunately message with version 3 waits for messages 1 and 2 before starts its processing.
Maybe my expectations are not true and this is a right behavior of kafka listener. Please help me to deal with kafka concurrency, why does it act like that?
Kafka doesn't work that way; you need at least as many partitions as consumers (controlled by concurrency
in the spring container).
Also, only one consumer (in a group) can consume from a partition at a time so, even if you increase the partitions, records in the same partition behind the "stuck" consumer will not be received by other consumers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With