I am using zookeeper to get data from kafka. And here I always get data from last offset point. Is there any way to specify the time of offset to get old data?
There is one option autooffset.reset. It accepts smallest or largest. Can someone please explain what is smallest and largest. Can autooffset.reset helps in getting data from old offset point instead of latest offset point?
So, in a default configuration, when you make a call to the poll method, it will check if it is time to commit. If you have passed five seconds since the previous call, the consumer will commit the last offset. So, Kafka will commit your current offset every five seconds.
Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition.
seekToEnd(Collections. singletonList(topicPartition)); long currentOffset = consumer. position(topicPartition) -1; Above snippet returns the current committed message offset for the given topic and partition number.
The consumers belong always to a group and, for each partition, the Zookeeper keeps track of the progress of that consumer group in the partition.
To fetch from the beginning, you can delete all the data associated with progress as Hussain refered
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
You can also specify the offset of partition you want, as specified in core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
However the offset is not time indexed, but you know for each partition is a sequence.
If your message contains a timestamp (and beware that this timestamp has nothing to do with the moment Kafka received your message), you can try to do an indexer that attempts to retrieve one entry in steps by incrementing the offset by N, and store the tuple (topic X, part 2, offset 100, timestamp) somewhere.
When you want to retrieve entries from a specified moment in time, you can apply a binary search to your rough index until you find the entry you want and fetch from there.
From the Kafka documentation they say "kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages. Don’t assume that offset 0 is the beginning offset, since messages age out of the log over time. "
Use the SimpleConsumerExample here: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
Similar question: Kafka High Level Consumer Fetch All Messages From Topic Using Java API (Equivalent to --from-beginning)
This might help
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With