I'm trying to use low-level Consumer Java API to manage offsets manually, with the latest kafka_2.10-0.8.2.1. To verify that the offsets I commit/read from Kafka are correct, I use the kafka.tools.ConsumerOffsetChecker tool.
Here is an example of the output for my topic/consumer group:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
Here is my interpretation of the result:
Offset = 5 --> this is the current offset of my 'elastic_search_group' consumer
logSize = 29 --> this is the Latest offset - the offset of the next message that will come to this topic/partition
Lag = 24 --> 29-5 - how many messages are not yet processed by my 'elastic_search_group' consumer
Pid - partition ID
Q1: is this correct?
Now, I want to get the same information from my Java consumer. Here, I found that I had to use two different APIs:
kafka.javaapi.OffsetRequest to get Earliest and Latest offsets, but kafka.javaapi.OffsetFetchRequest to get the current offset.
To get Earliest (or Latest) offset I do:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];
And to get the current offset I have to use a completely different API:
short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>();
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition);
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();
Q2: is it correct? why are there two different APIs to get a very similar information?
Q3: does it matter which versionId and correlationId I am using here? I though versionId should be 0 for pre-0.8.2.1 kafka, and be 1 for 0.8.2.1 and later - but seems like it works with 0 for 0.8.2.1 as well - see below ?
So, for the the example state of the topic above, and the above output of the ConsumerOffsetChecker, here is what I get from my Java code:
currentOffset=5; earliestOffset=29; latestOffset=29
'currentOffset' seems to be Ok, 'latestOffset' is correct too, but the 'earliestOffset'? I would expect it to be at least '5'?
Q4: How could it happen that the earliestOffset is higher than the currentOffset? My only suspicion is that maybe messages from the topic were cleaned out due to retention policy…. Any other cases this could have happened?
I was searching for means of finding lag in partitions. And that involves same steps you have taken. So far, from whatever I have learnt, I can give you answers.
kafka.api.OffsetRequest.CurrentVersion()
to get versionId. So hardcoding can be avoided. You can safely assume correlationId as 0.This is strange. When I use EarliestTime() I get earliest offset as 0 even when my current offset has progressed much further. It means it's the start of partition. So when some messages get expired in some future time, this earliest offset will then be some non-zero number. Now if messages were cleared out because of retention policy lag should have been changed. I am uncertain about this behaviour. One way to be certain would be, running consumer after noting such reading and checking in it's logs. It should show lines like these.
2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo:52 :: reset consume offset of requests:2: fetched offset = 405952: consumed offset = 335372 to 335372 2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo:52 :: reset consume offset of requests:2: fetched offset = 405952: consumed offset = 335373 to 335373
Note that in above log lines, fetched offset remains same and consumed offset is increasing. Finally it would end in
2015-06-09 18:49:16 :: DEBUG :: PartitionTopicInfo:52 :: reset consume offset of requests:2: fetched offset = 405952: consumed offset = 405952 to 405952
Then this would mean that due to log retention policy offset from 335372 to 405952 were expired
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With