I'm using spring-kafka, everything is working if I don't set the concurrency of ConcurrentKafkaListenerContainerFactory, when I set it to a number greater than 1, I get an exception:
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=client-3
My config:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConcurrency(kafkaConfig.getConcurrency());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.setConsumerFactory(consumerFactory());
return factory;
}
properties:
kafka.enable-auto-commit=false
kafka.client-id=client-1
kafka.concurrency=2
I have opened an issue for this on github. Setting a different client.id
for each thread is not currently supported.
As a work-around, you could launch a separate KafkaMessageListenerContainer
for each (which is what the ConcurrentMessageListenerContainer
does internally).
EDIT
Although not ideal, you can omit the client.id
and the kafka client will generate one for each (consumer-1
, consumer-2
etc)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With