Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Offset missing from Kafka logs - Simple Consumer unable to proceed

I have a 3-node kafka cluster setup. I am using storm to read messages from kafka. Each topic in my system has 7 partitions.

Now I am facing a weird problem. Till 3 days ago, everything was working fine. However, now it seems my storm topology is unable to read specifically from 2 partitions - #1 and #4.

I tried to drill down to the problem and found that in my kafka logs, for both of these partitions, one offset is missing i.e. after 5964511, next offset is 5964513 and not 5964512.

Due to missing offset, Simple Consumer is not able to proceed to next offsets. Am I doing something wrong or is it a known bug ?

What possibly could be the reason for such behaviour ?

I am using following code to read window of valid offsets :

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
    OffsetRequest request = new OffsetRequest( requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);
    long[] validOffsets = response.offsets(topic, partition);
    for (long validOffset : validOffsets) {
        System.out.println(validOffset + " : ");
    }
    long largestOffset = validOffsets[0];
    long smallestOffset = validOffsets[validOffsets.length - 1];
    System.out.println(smallestOffset + " : " + largestOffset );
    return largestOffset;
}

This gives me following output :

4529948 : 6000878

So, the offset I am providing is well within the offset range.

like image 403
Rishabh Avatar asked Nov 11 '22 21:11

Rishabh


1 Answers

Sorry for the late answer, but...

I code for this case by having a Long instance var to hold the next offset to read and then checking after the fetch to see if the returned FetchResponse hasError(). If there was an error I change the next offset value to a reasonable value (could be the next offset or the last available offset) and try again.

like image 68
Chris Gerken Avatar answered Nov 14 '22 23:11

Chris Gerken