Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spring kafka thorws InstanceAlreadyExistsException exception after setting concurrency > 1

I'm using spring-kafka, everything is working if I don't set the concurrency of ConcurrentKafkaListenerContainerFactory, when I set it to a number greater than 1, I get an exception:

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=client-3

My config:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
    kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
    ConcurrentKafkaListenerContainerFactory<String, String>();

    factory.setConcurrency(kafkaConfig.getConcurrency());

    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

properties:

kafka.enable-auto-commit=false

kafka.client-id=client-1

kafka.concurrency=2
like image 348
rellocs wood Avatar asked Apr 25 '17 03:04

rellocs wood


1 Answers

I have opened an issue for this on github. Setting a different client.id for each thread is not currently supported.

As a work-around, you could launch a separate KafkaMessageListenerContainer for each (which is what the ConcurrentMessageListenerContainer does internally).

EDIT

Although not ideal, you can omit the client.id and the kafka client will generate one for each (consumer-1, consumer-2 etc)

like image 50
Gary Russell Avatar answered Nov 01 '22 12:11

Gary Russell