Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to have one Kafka consumer thread per topic?

We have Springboot application that uses Spring-Kafka (2.1.7). We have enabled concurrency, so we can have one consumer thread per partition. So currently, if we have 3 topics, each with 2 partitions, there will be 2 consumer threads as shown below:

ConsumerThread1 - [topic1-0, topic2-0, topic3-0]
ConsumerThread2 - [topic1-1, topic2-1, topic3-1]

However, instead of a one KafkaListener (or consumer thread) per partition, we would like to have one consumer thread per topic. For example:

ConsumerThread1 - [topic1-0, topic1-1]
ConsumerThread2 - [topic2-0, topic2-1]
ConsumerThread3 - [topic3-0, topic3-1]

If that is not possible, even the following setup is fine:

ConsumerThread1 - [topic1-0]
ConsumerThread2 - [topic1-1]
ConsumerThread3 - [topic2-0]
ConsumerThread4 - [topic2-1]
ConsumerThread5 - [topic3-0]
ConsumerThread6 - [topic3-1]

The catch is that we do not know the complete list of topics before hand (we are using the wildcard topic pattern). A new topic can be added at any time, and a new consumer thread (or threads) should be created for this new topic dynamically during run-time.

Is there any way this can be achieved?

like image 924
agentwarn Avatar asked Dec 17 '22 17:12

agentwarn


1 Answers

You can create separate containers for each topic from spring-kafka:2.2 and set concurrency 1, so that each containers will consume from each topic

Starting with version 2.2, you can use the same factory to create any ConcurrentMessageListenerContainer. This might be useful if you want to create several containers with similar properties or you wish to use some externally configured factory, such as the one provided by Spring Boot auto-configuration. Once the container is created, you can further modify its properties, many of which are set by using container.getContainerProperties(). The following example configures a ConcurrentMessageListenerContainer:

@Bean
public ConcurrentMessageListenerContainer<String, String>(
    ConcurrentKafkaListenerContainerFactory<String, String> factory) {

ConcurrentMessageListenerContainer<String, String> container =
    factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}

Note : Containers created this way are not added to the endpoint registry. They should be created as @Bean definitions so that they are registered with the application context.

like image 150
Deadpool Avatar answered Jan 15 '23 01:01

Deadpool