I am new to Spring and Kafka. I am working on a use case [using SpringBoot-kafka] where in users are allowed to create kafka topics at runtime. The spring application is expected to subscribe to these topics pro-grammatically at runtime. What i know so far is that, Kafka listener are design time and hence topics needs to be specified before startup. Is there a way to dynamically subscribe to kafka topics in SpringBoot-Kafka integration?
Referred this https://github.com/spring-projects/spring-kafka/issues/132
Current approach that i am planning to implement is, do not use Spring-Kafka integration instead implement Kafka consumer myself [using java code] as mentioned here spring boot kafka consumer - how to properly consume kafka messages from spring boot
You cannot "dynamically pass topics to Kafka listener "; you have to programmatically create a listener container instead.
To create a Kafka topic programmatically introduce a configuration class that annotated with @Configuration : this annotation indicates that the Java class can be used by Spring as a source of bean definitions. Next to the name of the Kafka topic name you can specify: the number of partitions for the topic.
So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.
Multi-Topic Consumers We may have a consumer group that listens to multiple topics. If they have the same key-partitioning scheme and number of partitions across two topics, we can join data across the two topics.
Kafka listeners are only "design time" if you want to specify them using annotations. Spring-kafka allows you to create them dynamically as well, see KafkaMessageListenerContainer.
The simplest example of Kafka listener created on the fly would be:
Map<String, Object> consumerConfig = ImmutableMap.of(
BOOTSTRAP_SERVERS_CONFIG, "brokerAddress",
GROUP_ID_CONFIG, "groupId"
);
DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory =
new DefaultKafkaConsumerFactory<>(
consumerConfig,
new StringDeserializer(),
new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties("topicName");
containerProperties.setMessageListener((MessageListener<String, String>) record -> {
//do something with received record
}
ConcurrentMessageListenerContainer container =
new ConcurrentMessageListenerContainer<>(
kafkaConsumerFactory,
containerProperties);
container.start();
For more explanation and code see this blog post: http://www.douevencode.com/articles/2017-12/spring-kafka-without-annotations/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With