Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka consumer for multiple topic

I have a list of topics (for now it's 10) whose size can increase in future. I know we can spawn multiple threads (per topic) to consume from each topic, but in my case if the number of topics increases, then the number of threads consuming from the topics increases, which I do not want, since the topics are not going to get data too frequently, so the threads will sit ideal.

Is there any way to have a single consumer to consume from all topics? If yes, then how can we achieve it? Also how will the offset be maintained by Kafka? Please suggest answers.

like image 592
Apollo Avatar asked Sep 19 '16 08:09

Apollo


People also ask

Can one Kafka consumer listen to multiple topics?

Multi-Topic ConsumersWe may have a consumer group that listens to multiple topics. If they have the same key-partitioning scheme and number of partitions across two topics, we can join data across the two topics.

Can a single consumer consume from multiple topics?

Yes, Kafka's design allows consumers from one consumer group to consume messages from multiple topics. The protocol underlying consumer. poll() allows sending requests for multiple partitions(across topics as well) in one request.

Can a Kafka consumer be part of multiple consumer groups?

So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.

Is Kafka consumer group per topic?

Kafka Consumer Group ID It is important to note that each topic partition is only assigned to one consumer within a consumer group, but a consumer from a consumer group can be assigned multiple partitions.


1 Answers

We can subscribe for multiple topic using following API : consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)

Consumer has the topic info and we can comit using consumer.commitAsync or consumer.commitSync() by creating OffsetAndMetadata object as follows.

ConsumerRecords<String, String> records = consumer.poll(long value); for (TopicPartition partition : records.partitions()) {     List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);     for (ConsumerRecord<String, String> record : partitionRecords) {         System.out.println(record.offset() + ": " + record.value());     }     long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } 
like image 191
Subrata Saha Avatar answered Sep 23 '22 07:09

Subrata Saha