Anyone know if a single listener can listens to multiple topic like below? I know just "topic1" works, what if I want to add additional topics? Can you please show example for both below? Thanks for the help!
@KafkaListener(topics = "topic1,topic2")
public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
System.out.println(record);
}
or
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionInitialOffset("topic1, topic2", 0));
Yes, Kafka's design allows consumers from one consumer group to consume messages from multiple topics.
Multi-Topic Consumers We may have a consumer group that listens to multiple topics. If they have the same key-partitioning scheme and number of partitions across two topics, we can join data across the two topics.
You can't have multiple consumers that belong to the same group in one thread and you can't have multiple threads safely use the same consumer. One consumer per thread is the rule. To run multiple consumers in the same group in one application, you will need to run each in its own thread.
In general, concurrency is the ability to perform parallel processing with no affect on the end result. In Kafka, the parallel consumption of messages is achieved through consumer groups where individual consumers read from a given topic/partitions in parallel.
Yes, just follow the @KafkaListener
JavaDocs:
/**
* The topics for this listener.
* The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
* Expression must be resolved to the topic name.
* Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}.
* @return the topic names or expressions (SpEL) to listen to.
*/
String[] topics() default {};
/**
* The topic pattern for this listener.
* The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
* Expression must be resolved to the topic pattern.
* Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}.
* @return the topic pattern or expression (SpEL).
*/
String topicPattern() default "";
/**
* The topicPartitions for this listener.
* Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
* @return the topic names or expressions (SpEL) to listen to.
*/
TopicPartition[] topicPartitions() default {};
So, your use-case should be like:
@KafkaListener(topics = {"topic1" , "topic2"})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With