I am working with Kafka and trying to setup consumer group by follwing this article. The only difference is I have created my own abstract class, handler to make design simpler.
Below is my abstract class:
public abstract class Consumer implements Runnable {
private final Properties consumerProps;
private final String consumerName;
public Consumer(String consumerName, Properties consumerProps) {
this.consumerName = consumerName;
this.consumerProps = consumerProps;
}
protected abstract void shutdown();
protected abstract void run(String consumerName, Properties consumerProps);
@Override
public final void run() {
run(consumerName, consumerProps);
}
}
Below is my KafkaConsumerA
which extends above abstract class:
public class KafkaConsumerA extends Consumer {
private KafkaConsumer<byte[], DataHolder> consumer;
public KafkaConsumerA(String consumerName, Properties consumerProps) {
super(consumerName, consumerProps);
}
@Override
public void shutdown() {
consumer.wakeup();
}
@Override
protected void run(String consumerName, Properties consumerProps) {
// exception comes from below line from two of the threads and the remaining one thread works fine.
consumer = new KafkaConsumer<>(consumerProps);
List<String> topics = getTopicsBasisOnConsumerName(consumerName);
try {
consumer.subscribe(topics);
// Setup the schema config
Map<String, Object> config = new HashMap<>();
config.put("urls", "https://abc.qa.host.com");
GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);
while (true) {
ConsumerRecords<byte[], DataHolder> records = consumer.poll(200);
for (ConsumerRecord<byte[], DataHolder> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out
.println((Thread.currentThread().getId() % 3) + 1 + ": " + decoder.decode(record.value()));
}
}
} catch (WakeupException ex) {
ex.printStackTrace();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
consumer.close();
}
}
}
And below is my Handler class:
// looks like something is wrong in this class
public final class ConsumerHandler {
private final ExecutorService executorServiceProcess;
private final Consumer consumer;
private final List<Consumer> consumers = new ArrayList<>();
public ConsumerHandler(Consumer consumer, int poolSize) {
this.executorServiceProcess = Executors.newFixedThreadPool(poolSize);
this.consumer = consumer;
for (int i = 0; i < poolSize; i++) {
consumers.add(consumer);
executorServiceProcess.submit(consumer);
}
}
public void shutdown() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (Consumer consumer : consumers) {
consumer.shutdown();
}
executorServiceProcess.shutdown();
try {
executorServiceProcess.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
});
}
}
And here is I am starting all my consumers in the consumer group from the main class:
public static void main(String[] args) {
ConsumerHandler handlerA =
new ConsumerHandler(new KafkaConsumerA("KafkaConsumerA", getConsumerProps()), 3);
// run KafkaConsumerB here
handlerA.shutdown();
// shutdown KafkaConsumerB here
}
So with this - my plan is to setup a consumer group with three consumers in KafkaConsumerA
and all three subscribed to same topics.
Error:-
Whenever I run this, looks like only one consumer in the consumer group works and other two doesn't work. And I see this exception on the console from those two:
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=a97716e0-0e05-4938-8fa1-6b872cf24e34
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.7.0_79]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.7.0_79]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.7.0_79]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58) ~[kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694) [kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587) [kafka-clients-0.10.0.0-SASL.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569) [kafka-clients-0.10.0.0-SASL.jar:na]
What is wrong I am doing here? getConsumerProps()
method return properties object which has client.id
and group.id
in it with same value for all three consumers in that consumer group.
Below is my design details:
KafkaConsumerA
will have three consumers in a consumer group and each consumer will work on topicA
.KafkaConsumerB
(similar to KafkaConsumerA) will have two consumers in a different consumer group and each of those consumer will work on topicB
.And these two consumers KafkaConsumerA
and KafkaConsumerB
will be running on same box with different consumer group independent of each other.
Kafka is trying to register MBeans for application monitoring and is using the client.id
to do so. As you said, you have the properties injected in your abstract class and inject for every consumer the same client.id
and group.id
in group A
. However, you have different clients, so you should give them their own client.id
, but keep the same group.id
. This will register the different client/consumers in the same consumer group and make them work together, but not clash on the MBeans registration.
I understand this is an old question, but given these days we are using annotations alot. So adding another flavour of the problem and answer. We were facing the same issue, but we had been using @KafkaListener annotation across our 2 consumers in the same app and most of the properties injected directly
@KafkaListener(
topics = "${app.source}",
groupId = "${app.kafka.consumer.group-id}",
clientIdPrefix = "subscriber",
containerFactory = ListenerContainerFactory.AVRO_SPECIFIC,
errorHandler = "customConsumerAwareListenerErrorHandler"
)
Our consumers were having similar implementation but connected to different topics, so we simply modified the "clientIdPrefix" to give them unique values during instantiation. So the final code was
<-- First Consumer Component ->
@KafkaListener(
topics = "${app.source}",
groupId = "${app.kafka.consumer.group-id}",
clientIdPrefix = "firstSubscriber",
containerFactory = ListenerContainerFactory.AVRO_SPECIFIC,
errorHandler = "customConsumerAwareListenerErrorHandler"
)
<-- Second Consumer Component ->
@KafkaListener(
topics = "${app.source}",
groupId = "${app.kafka.consumer.group-id}",
clientIdPrefix = "secondSubscriber",
containerFactory = ListenerContainerFactory.AVRO_SPECIFIC,
errorHandler = "customConsumerAwareListenerErrorHandler"
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With