We have Springboot application that uses Spring-Kafka (2.1.7). We have enabled concurrency, so we can have one consumer thread per partition. So currently, if we have 3 topics, each with 2 partitions, there will be 2 consumer threads as shown below:
ConsumerThread1 - [topic1-0, topic2-0, topic3-0]
ConsumerThread2 - [topic1-1, topic2-1, topic3-1]
However, instead of a one KafkaListener (or consumer thread) per partition, we would like to have one consumer thread per topic. For example:
ConsumerThread1 - [topic1-0, topic1-1]
ConsumerThread2 - [topic2-0, topic2-1]
ConsumerThread3 - [topic3-0, topic3-1]
If that is not possible, even the following setup is fine:
ConsumerThread1 - [topic1-0]
ConsumerThread2 - [topic1-1]
ConsumerThread3 - [topic2-0]
ConsumerThread4 - [topic2-1]
ConsumerThread5 - [topic3-0]
ConsumerThread6 - [topic3-1]
The catch is that we do not know the complete list of topics before hand (we are using the wildcard topic pattern). A new topic can be added at any time, and a new consumer thread (or threads) should be created for this new topic dynamically during run-time.
Is there any way this can be achieved?
You can create separate containers for each topic from spring-kafka:2.2 and set concurrency 1, so that each containers will consume from each topic
Starting with version 2.2, you can use the same factory to create any ConcurrentMessageListenerContainer. This might be useful if you want to create several containers with similar properties or you wish to use some externally configured factory, such as the one provided by Spring Boot auto-configuration. Once the container is created, you can further modify its properties, many of which are set by using container.getContainerProperties(). The following example configures a ConcurrentMessageListenerContainer:
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
Note : Containers created this way are not added to the endpoint registry. They should be created as @Bean definitions so that they are registered with the application context.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With