Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Consumer.endOffsets work in Kafka?

Assume I've a timer task running indefinitely which iterates over the all the consumer groups in the kafka cluster and outputs lag, committed offset and end offset for all partitions for each group. Similar to how Kafka console consumer group script works except it's for all groups.

Something like

Single Consumer - Not Working - Doesn't return offsets for some of the provided topic partitions ( ex. 10 provided - 5 Offsets Returned )

Consumer consumer;

static {
  consumer = createConsumer();
}

run() { 
  List<String> groupIds = getConsumerGroups();
  for(String groupId: groupIds) {
       List<TopicParition> topicParitions =  getTopicParitions(groupId);
       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
   }
}

Multiple Consumers - Working

run() { 
   List<String> groupIds = getConsumerGroups();
   for(String groupId: groupIds) {
        List<TopicParition> topicParitions =  getTopicParitions(groupId);
        Consumer consumer = createConsumer();
        consumer.endOffsets(topicParitions); This works!!!
   }
 }

Versions:Kafka-Client 2.0.0

Am I using the consumer api incorrectly ? Ideally I would like to use single consumer.

Let me know if you need more details.

like image 548
s7vr Avatar asked Nov 07 '22 01:11

s7vr


1 Answers

I think you're almost there. First collect all the topic partitions you're interested in, and then issue a consumer.endOffsets command.

Bear in mind that I haven't tried to run it, but something like this should work:

run() { 
   Consumer consumer = createConsumer();
   List<String> groupIds = getConsumerGroups();
   List<TopicPartition> topicPartitions = new ArrayList<>();

   for (String groupId: groupIds) {
        topicPartitions.addAll(getTopicPartitions(groupId));
   }

   consumer.endOffsets(topicPartitions); 
}
like image 91
mjuarez Avatar answered Nov 14 '22 03:11

mjuarez