Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Java API offset operations clarification

I'm trying to use low-level Consumer Java API to manage offsets manually, with the latest kafka_2.10-0.8.2.1. To verify that the offsets I commit/read from Kafka are correct, I use the kafka.tools.ConsumerOffsetChecker tool.

Here is an example of the output for my topic/consumer group:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group   elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group           Topic                          Pid Offset          logSize         Lag             Owner
elastic_search_group my_log_topic              0   5               29              24              none

  Here is my interpretation of the result:

Offset = 5 --> this is the current offset of my 'elastic_search_group' consumer

logSize = 29 --> this is the Latest offset - the offset of the next message that will come to this topic/partition

Lag = 24 --> 29-5 - how many messages are not yet processed by my 'elastic_search_group' consumer

Pid - partition ID

Q1: is this correct?

Now, I want to get the same information from my Java consumer. Here, I found that I had to use two different APIs:

kafka.javaapi.OffsetRequest to get Earliest and Latest offsets, but kafka.javaapi.OffsetFetchRequest to get the current offset.

To get Earliest (or Latest) offset I do:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
long[] offsets = response.offsets(topic, partition);
long myEarliestOffset = offsets[0];
// OR for Latest: long myLatestOffset = offsets[0];

And to get the current offset I have to use a completely different API:

short versionID = 0;
int correlationId = 0;
List<TopicAndPartition> topicPartitionList = new ArrayList<TopicAndPartition>(); 
TopicAndPartition myTopicAndPartition = new TopicAndPartition(myTopic, myPartition);
topicPartitionList.add(myTopicAndPartition); 
OffsetFetchRequest offsetFetchReq = new OffsetFetchRequest(
kafkaGroupId, topicPartitionList, versionID, correlationId, kafkaGroupId);
OffsetFetchResponse offsetFetchResponse = simpleConsumer.fetchOffsets(offsetFetchReq);
long currentOffset = offsetFetchResponse.offsets().get(myTopicAndPartition).offset();

Q2: is it correct? why are there two different APIs to get a very similar information?

Q3: does it matter which versionId and correlationId I am using here? I though versionId should be 0 for pre-0.8.2.1 kafka, and be 1 for 0.8.2.1 and later - but seems like it works with 0 for 0.8.2.1 as well - see below ?

So, for the the example state of the topic above, and the above output of the ConsumerOffsetChecker, here is what I get from my Java code:

currentOffset=5; earliestOffset=29; latestOffset=29

'currentOffset' seems to be Ok, 'latestOffset' is correct too, but the 'earliestOffset'? I would expect it to be at least '5'?

Q4: How could it happen that the earliestOffset is higher than the currentOffset? My only suspicion is that maybe messages from the topic were cleaned out due to retention policy…. Any other cases this could have happened?

like image 262
Marina Avatar asked May 20 '15 18:05

Marina


Video Answer


1 Answers

I was searching for means of finding lag in partitions. And that involves same steps you have taken. So far, from whatever I have learnt, I can give you answers.

  1. logSize directly points to how many messages have been accumulated in that specific partition. Or, it specifies max offset of messages in that partition. Offset is the offset of last successfully consumed message. So lag is just the difference between Log size and Offset.
  2. Yes it is correct. So far, those are the only two ways of finding current offset and earliest or latest offset
  3. I don't know why there's a need to specify versionId. You can use kafka.api.OffsetRequest.CurrentVersion() to get versionId. So hardcoding can be avoided. You can safely assume correlationId as 0.
  4. This is strange. When I use EarliestTime() I get earliest offset as 0 even when my current offset has progressed much further. It means it's the start of partition. So when some messages get expired in some future time, this earliest offset will then be some non-zero number. Now if messages were cleared out because of retention policy lag should have been changed. I am uncertain about this behaviour. One way to be certain would be, running consumer after noting such reading and checking in it's logs. It should show lines like these.

    2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo:52 :: reset consume offset of requests:2: fetched offset = 405952: consumed offset = 335372 to 335372 2015-06-09 18:49:15 :: DEBUG :: PartitionTopicInfo:52 :: reset consume offset of requests:2: fetched offset = 405952: consumed offset = 335373 to 335373

Note that in above log lines, fetched offset remains same and consumed offset is increasing. Finally it would end in

2015-06-09 18:49:16 :: DEBUG :: PartitionTopicInfo:52 :: reset consume offset of requests:2: fetched offset = 405952: consumed offset = 405952 to 405952

Then this would mean that due to log retention policy offset from 335372 to 405952 were expired

like image 128
Shades88 Avatar answered Oct 27 '22 18:10

Shades88