I use the spring-kafka annotation @KafkaListener to designate my listener method.
I consume a single topic with a single partition. The messages are never more than one or two a second so a single thread is acceptable. The spring-kafka docs say the @KafkaListener defaults to using a ConcurrentMessageListenerContainer. Is the correct way to control concurrency by using setConcurrency?
Or, should I be somehow create a KafkaMessageListenerContainer, which is single threaded?
I currently use this:
@Bean("appContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
...
}
Is the correct way to control concurrency by using setConcurrency?
What you have is correct, but the default concurrency on the container is 1 so it's not necessary to specify it when you don't need concurrency.
Or, should I be somehow create a
KafkaMessageListenerContainer, which is single threaded?
The concurrent container spins up a "child" KafkaMessageListenerContainer for each concurrency so just one is already created for you.
In your case, the concurrency does not make any value. As long as you have a single partition, you are always going to have only one thread. This is how it is recommended to work with Apache Kafka: one thread per partition. So, Spring for Apache Kafka follows this requirement and doesn't give us a chance to break behavior. Even if you set that concurrency for a big number, you are not going to poll from partition in parallel. It starts to bring a value, when you have more than one partition to consume.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With