I want to create a concurrent @KafkaListener
which can handle multiple topics each with different number of partitions.
I have noticed that Spring-Kafka only initializes one consumer per partition for the topic with most partitions.
Example: I have set concurrency to 8. I got a @KafkaListener
listening to the following topics. Topic A has the most partitions - 5, so Spring-Kafka initializes 5 consumers. I expected Spring-Kafka to initialize 8 consumers, which is the maximum allowed according to my concurrency property.
What is the technical reason for not initializing more consumers?
How do I bypass this, such that I can initialize more consumers using the @KafkaListener
annotation? (if possible at all)
When a listener is configured to listen to multiple topics, each consumer instance listens on all topics; Spring indeed starts 8 consumers (in this case), but the way those partitions are actually distributed across the consumers is controlled by Kafka's group management:
So you end up with 3 idle consumers in this case.
It might be possible to provide a custom partition.assignment.strategy
to do the distribution the way you want, but I've never looked into that.
EDIT
I just tested with the RoundRobinAssignor
...
spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
and...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With