Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I use multiple consumers in Kafka?

I am a new student studying Kafka and I've run into some fundamental issues with understanding multiple consumers that articles, documentations, etc. have not been too helpful with so far.

One thing I have tried to do is write my own high level Kafka producer and consumer and run them simultaneously, publishing 100 simple messages to a topic and having my consumer retrieve them. I have managed to do this successfully, but when I try to introduce a second consumer to consume from the same topic that messages were just published to, it receives no messages.

It was my understanding that for each topic, you could have consumers from separate consumer groups and each of these consumer groups would get a full copy of the messages produced to some topic. Is this correct? If not, what would be the proper way for me to set up multiple consumers? This is the consumer class that I have written so far:

public class AlternateConsumer extends Thread {     private final KafkaConsumer<Integer, String> consumer;     private final String topic;     private final Boolean isAsync = false;      public AlternateConsumer(String topic, String consumerGroup) {         Properties properties = new Properties();         properties.put("bootstrap.servers", "localhost:9092");         properties.put("group.id", consumerGroup);         properties.put("partition.assignment.strategy", "roundrobin");         properties.put("enable.auto.commit", "true");         properties.put("auto.commit.interval.ms", "1000");         properties.put("session.timeout.ms", "30000");         properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");         properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         consumer = new KafkaConsumer<Integer, String>(properties);         consumer.subscribe(topic);         this.topic = topic;     }       public void run() {         while (true) {             ConsumerRecords<Integer, String> records = consumer.poll(0);             for (ConsumerRecord<Integer, String> record : records) {                 System.out.println("We received message: " + record.value() + " from topic: " + record.topic());             }         }      } } 

Furthermore, I noticed that originally I was testing the above consumption for a topic 'test' with only a single partition. When I added another consumer to an existing consumer group say 'testGroup', this trigged a Kafka rebalance which slowed down the latency of my consumption by a significant amount, in the magnitude of seconds. I thought that this was an issue with rebalancing since I only had a single partition, but when I created a new topic 'multiplepartitions' with say 6 partitions, similar issues arose where adding more consumers to the same consumer group caused latency issues. I have looked around and people are telling me I should be using a multi-threaded consumer -- can anyone shed light on that?

like image 754
Jeff Gong Avatar asked Jun 17 '15 18:06

Jeff Gong


People also ask

Can we have multiple consumers in Kafka?

A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.

How do I add more consumers in Kafka?

So you should get that adding a consumer is just adding a client. For server, it is just add one more connection. Actually, the Kafka topic's partition setting is specified at creating topic, also you can change this configure after that.

How many consumers can Kafka handle?

While Kafka allows only one consumer per topic partition, there may be multiple consumer groups reading from the same partition.

Can Kafka consumer listen to multiple topics?

Yes, Kafka's design allows consumers from one consumer group to consume messages from multiple topics.


2 Answers

I think your problem lies with the auto.offset.reset property. When a new consumer reads from a partition and there's no previous committed offset, the auto.offset.reset property is used to decide what the starting offset should be. If you set it to "largest" (the default) you start reading at the latest (last) message. If you set it to "smallest" you get the first available message.

So add:

properties.put("auto.offset.reset", "smallest"); 

and try again.

* edit *

"smallest" and "largest" were deprecated a while back. You should use "earliest" or "latest" now. Any questions, check the docs

like image 134
Chris Gerken Avatar answered Oct 15 '22 02:10

Chris Gerken


If you want multiple consumers to consume same messages (like a broadcast), you can spawn them with different consumer group and also setting auto.offset.reset to smallest in consumer config. If you want multiple consumers to finish consuming in parallel ( divide the work among them ), you should create number of partitions >= number of consumers. One partition can be only consumed by at most one consumer process. But One consumer can consume more than one partitions.

like image 36
user1119541 Avatar answered Oct 15 '22 01:10

user1119541