Assume I've a timer task running indefinitely which iterates over the all the consumer groups in the kafka cluster and outputs lag, committed offset and end offset for all partitions for each group. Similar to how Kafka console consumer group script works except it's for all groups.
Something like
Single Consumer - Not Working - Doesn't return offsets for some of the provided topic partitions ( ex. 10 provided - 5 Offsets Returned )
Consumer consumer;
static {
consumer = createConsumer();
}
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
}
}
Multiple Consumers - Working
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
Consumer consumer = createConsumer();
consumer.endOffsets(topicParitions); This works!!!
}
}
Versions:Kafka-Client 2.0.0
Am I using the consumer api incorrectly ? Ideally I would like to use single consumer.
Let me know if you need more details.
I think you're almost there. First collect all the topic partitions you're interested in, and then issue a consumer.endOffsets
command.
Bear in mind that I haven't tried to run it, but something like this should work:
run() {
Consumer consumer = createConsumer();
List<String> groupIds = getConsumerGroups();
List<TopicPartition> topicPartitions = new ArrayList<>();
for (String groupId: groupIds) {
topicPartitions.addAll(getTopicPartitions(groupId));
}
consumer.endOffsets(topicPartitions);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With