I need to read the messages in a given time range out of a Kafka topic. The solution that I can think of is to first find out the largest offset for the beginning of the time range, and then continue consume the messages until the offsets on all partitions past the end of the time range. Is there any better approach for solving this problem? Thanks!
Well, you definitely have to first search for the first offset which fits the opening of the time range.
This can be done using the KafkaConsumer#offsetsForTimes method.
The method accepts a map of Map<TopicPartition, Long(timestamp)>
, and returns a Map<TopicPartition, OffsetAndTimestamp>
where the timestamp in OffsetAndTimestamp
is of the first message with timestamp Equal to or greater then the one specified.
From there, you can assign your consumer to the offset returned, and iterate until the timestamp in the record exceeds the end of your time range.
Some pseudo code:
static void main(String[] args) {
String topic = args[1];
long timestampBeginning = Long.parseLong(args[2]);
long timestampEnd = Long.parseLong(args[3]);
TopicPartition partition = new TopicPartition(topic, 0);
Consumer<Object, Object> consumer = createConsumer();
long beginningOffset = consumer.offsetsForTimes(
Collections.singletonMap(partition, timestampBeginning))
.get(partition).offset();
consumer.assign(Collections.singleton(partition)); // must assign before seeking
consumer.seek(partition, beginningOffset);
for (ConsumerRecord<Object, Object> record : consumer.poll()) {
if (record.timestamp() > timestampEnd) {
break; // or whatever
}
// handle record
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With