I have a 3-node kafka cluster setup. I am using storm to read messages from kafka. Each topic in my system has 7 partitions.
Now I am facing a weird problem. Till 3 days ago, everything was working fine. However, now it seems my storm topology is unable to read specifically from 2 partitions - #1 and #4.
I tried to drill down to the problem and found that in my kafka logs, for both of these partitions, one offset is missing i.e. after 5964511, next offset is 5964513 and not 5964512.
Due to missing offset, Simple Consumer is not able to proceed to next offsets. Am I doing something wrong or is it a known bug ?
What possibly could be the reason for such behaviour ?
I am using following code to read window of valid offsets :
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
OffsetRequest request = new OffsetRequest( requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] validOffsets = response.offsets(topic, partition);
for (long validOffset : validOffsets) {
System.out.println(validOffset + " : ");
}
long largestOffset = validOffsets[0];
long smallestOffset = validOffsets[validOffsets.length - 1];
System.out.println(smallestOffset + " : " + largestOffset );
return largestOffset;
}
This gives me following output :
4529948 : 6000878
So, the offset I am providing is well within the offset range.
Sorry for the late answer, but...
I code for this case by having a Long instance var to hold the next offset to read and then checking after the fetch to see if the returned FetchResponse hasError(). If there was an error I change the next offset value to a reasonable value (could be the next offset or the last available offset) and try again.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With