I am using spring kafka and would like to have multiple consumer groupIds mentioned in the application yaml that I can use in my kafkaListener class where I am listening to multiple topics.
My kafka properties in application.yml file look like this for now
kafka:
properties:
topics:
topic1: topic1
topic2: topic2
bootstrap-servers: server1,server2
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 4
consumer:
group-id: mygroupid
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Is there a way to have multiple groupIDs in the consumer block above.
Alternatively, I have give different groupIDs in kafkaListener in my spring code as follows, I am not sure how to set up the rest of the propeties like auto-offset-reset, key-deserializer etc:
@KafkaListener(topics = "topic1", groupId = "cd1")
public void consumeMessage(String message) throws Exception {
// some code goes here
}
Please let me know how to accomplish what I am trying to do as I am new to kafka.
Boot only autoconfigures one DefaultKafkaConsumerFactory and DefaultKafkaConsumerFactory from the yml, so all the properties are shared across all consumers. That's why we added the groupId (and use the id if groupId is not provided); this is the most common property that changes between consumers.
You can, of course, use property placeholders so groupId = "${group.one}" will use the property group.one from the yml.
To change more fundamental stuff like serializers/deserializers, if you are using a version earlier than 2.2.4, you will need to create multiple factories and container factories.
However, starting with version 2.2.4, you can now set any arbitrary kafka consumer property in the KafkaListener annotation...
/**
* Kafka consumer properties; they will supersede any properties with the same name
* defined in the consumer factory (if the consumer factory supports property overrides).
* <h3>Supported Syntax</h3>
* <p>The supported syntax for key-value pairs is the same as the
* syntax defined for entries in a Java
* {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
* <ul>
* <li>{@code key=value}</li>
* <li>{@code key:value}</li>
* <li>{@code key value}</li>
* </ul>
* {@code group.id} and {@code client.id} are ignored.
* @return the properties.
* @since 2.2.4
* @see org.apache.kafka.clients.consumer.ConsumerConfig
* @see #groupId()
* @see #clientIdPrefix()
*/
String[] properties() default {};
Note that these properties are in the native kafka dotted format (auto.offset.reset) not the boot hyphenated or camel case properties.
Here's an example from the documentation:
@KafkaListener(topics = "myTopic", groupId="group", properties= {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
Again, the values can be property placeholders.
On the producer side, you need multiple factories still.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With