I followed "Intro to Apache Kafka with Spring" tutorial by baeldung.com.
I set up a KafkaConsumerConfig class with the kafkaConsumerFactory method:
private ConsumerFactory<String, String> kafkaConsumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
...
return new DefaultKafkaConsumerFactory<>(props);
}
and two "custom" factories:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("foo");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> barKafkaListenerContainerFactory() {
return kafkaListenerContainerFactory("bar");
}
In the MessageListener class, instead I used @KafkaListener annotation to register consumers with the given groupId to listen on a topic:
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
System.out.println("Received Message in group 'foo': " + message);
...
}
@KafkaListener(topics = "${message.topic.name}", groupId = "bar", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupBar(String message) {
System.out.println("Received Message in group 'bar': " + message);
...
}
In this way there are two group of consumers, the ones having groupId "foo" and the ones having groupId "bar".
Now if I change container factory for the "foo" consumers from fooKafkaListenerContainerFactory to barKafkaListenerContainerFactory in this way
@KafkaListener(topics = "${message.topic.name}", groupId = "foo", containerFactory = "barKafkaListenerContainerFactory")
public void listenGroupFoo(String message) {
...
}
It seems an incompatibility between groupId of KafkaListener and groupId of container factory but nothing changes.
So, what I'm trying to understand is what props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);property does and why it seem is not considered.
The factory groupId is a default which is only used if there is no groupId (or id) on the @KafkaListener.
In early versions, it was only possible to set the groupId on the factory, which meant you needed a separate factory for each listener if different groups are needed, which defeats the idea of a factory that can be used for multiple listeners.
See the javadocs...
/**
* Override the {@code group.id} property for the consumer factory with this value
* for this listener only.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the group id.
* @since 1.3
*/
String groupId() default "";
/**
* When {@link #groupId() groupId} is not provided, use the {@link #id() id} (if
* provided) as the {@code group.id} property for the consumer. Set to false, to use
* the {@code group.id} from the consumer factory.
* @return false to disable.
* @since 1.3
*/
boolean idIsGroup() default true;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With