I have a list of topics (for now it's 10) whose size can increase in future. I know we can spawn multiple threads (per topic) to consume from each topic, but in my case if the number of topics increases, then the number of threads consuming from the topics increases, which I do not want, since the topics are not going to get data too frequently, so the threads will sit ideal.
Is there any way to have a single consumer to consume from all topics? If yes, then how can we achieve it? Also how will the offset be maintained by Kafka? Please suggest answers.
Multi-Topic ConsumersWe may have a consumer group that listens to multiple topics. If they have the same key-partitioning scheme and number of partitions across two topics, we can join data across the two topics.
Yes, Kafka's design allows consumers from one consumer group to consume messages from multiple topics. The protocol underlying consumer. poll() allows sending requests for multiple partitions(across topics as well) in one request.
So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.
Kafka Consumer Group ID It is important to note that each topic partition is only assigned to one consumer within a consumer group, but a consumer from a consumer group can be assigned multiple partitions.
We can subscribe for multiple topic using following API : consumer.subscribe(Arrays.asList(topic1,topic2), ConsumerRebalanceListener obj)
Consumer has the topic info and we can comit using consumer.commitAsync or consumer.commitSync() by creating OffsetAndMetadata object as follows.
ConsumerRecords<String, String> records = consumer.poll(long value); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With