Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How shall we read the Kafka topics in a given time range?

Tags:

apache-kafka

I need to read the messages in a given time range out of a Kafka topic. The solution that I can think of is to first find out the largest offset for the beginning of the time range, and then continue consume the messages until the offsets on all partitions past the end of the time range. Is there any better approach for solving this problem? Thanks!

like image 233
yuyang Avatar asked Jul 15 '14 21:07

yuyang


1 Answers

Well, you definitely have to first search for the first offset which fits the opening of the time range.

This can be done using the KafkaConsumer#offsetsForTimes method.

The method accepts a map of Map<TopicPartition, Long(timestamp)>, and returns a Map<TopicPartition, OffsetAndTimestamp> where the timestamp in OffsetAndTimestamp is of the first message with timestamp Equal to or greater then the one specified.

From there, you can assign your consumer to the offset returned, and iterate until the timestamp in the record exceeds the end of your time range.

Some pseudo code:

static void main(String[] args) {
    String topic = args[1];
    long timestampBeginning = Long.parseLong(args[2]);
    long timestampEnd = Long.parseLong(args[3]);
    TopicPartition partition = new TopicPartition(topic, 0);

    Consumer<Object, Object> consumer = createConsumer();

    long beginningOffset = consumer.offsetsForTimes(
            Collections.singletonMap(partition, timestampBeginning))
                    .get(partition).offset();

    consumer.assign(Collections.singleton(partition)); // must assign before seeking
    consumer.seek(partition, beginningOffset);

    for (ConsumerRecord<Object, Object> record : consumer.poll()) {
        if (record.timestamp() > timestampEnd) {
            break; // or whatever
        }

        // handle record
    }
}
like image 65
Giora Guttsait Avatar answered Nov 03 '22 14:11

Giora Guttsait