Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create separate Kafka listener for each topic dynamically in springboot?

I am new to Spring and Kafka. I am working on a use case [using SpringBoot-kafka] where in users are allowed to create kafka topics at runtime. The spring application is expected to subscribe to these topics pro-grammatically at runtime. What i know so far is that, Kafka listener are design time and hence topics needs to be specified before startup. Is there a way to dynamically subscribe to kafka topics in SpringBoot-Kafka integration?

Referred this https://github.com/spring-projects/spring-kafka/issues/132

Current approach that i am planning to implement is, do not use Spring-Kafka integration instead implement Kafka consumer myself [using java code] as mentioned here spring boot kafka consumer - how to properly consume kafka messages from spring boot

like image 955
Yashwanth Avatar asked Jan 08 '17 13:01

Yashwanth


People also ask

How do you pass topics dynamically to a Kafka listener?

You cannot "dynamically pass topics to Kafka listener "; you have to programmatically create a listener container instead.

How do I create a dynamic Kafka topic?

To create a Kafka topic programmatically introduce a configuration class that annotated with @Configuration : this annotation indicates that the Java class can be used by Spring as a source of bean definitions. Next to the name of the Kafka topic name you can specify: the number of partitions for the topic.

Can one Kafka topic have multiple consumers?

So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.

Can Kafka consumer group subscribe to multiple topics?

Multi-Topic Consumers We may have a consumer group that listens to multiple topics. If they have the same key-partitioning scheme and number of partitions across two topics, we can join data across the two topics.


1 Answers

Kafka listeners are only "design time" if you want to specify them using annotations. Spring-kafka allows you to create them dynamically as well, see KafkaMessageListenerContainer.

The simplest example of Kafka listener created on the fly would be:

Map<String, Object> consumerConfig = ImmutableMap.of(
    BOOTSTRAP_SERVERS_CONFIG, "brokerAddress",
    GROUP_ID_CONFIG, "groupId"
);

DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
        new DefaultKafkaConsumerFactory<>(
                consumerConfig,
                new StringDeserializer(),
                new StringDeserializer());

ContainerProperties containerProperties = new ContainerProperties("topicName");
containerProperties.setMessageListener((MessageListener<String, String>) record -> {
     //do something with received record
} 

ConcurrentMessageListenerContainer container =
        new ConcurrentMessageListenerContainer<>(
                kafkaConsumerFactory,
                containerProperties);

container.start();

For more explanation and code see this blog post: http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/

like image 200
michalbrz Avatar answered Oct 07 '22 16:10

michalbrz