Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to process logs from distributed log broker (Eg Kafka) exactly after 1 week?

What can be the setup if I want to process the logs which are exactly 1 week old from Kafka?

Usecase is that I maintain cumulative stats of the last 1 week's user activity. I am fine with eventual consistency and don't need the stats to be for exactly 1 week.

I have a streaming setup which processes the incoming logs from Kafka and updates the stats. Any activity older than 1 week should be deleted from the stats. One of the ways I can achieve is using a batch process (eg Spark) to remove activity older than 1 week from the stats.

Is there any way I can use stream processing to remove user activity older than 1 week from stats? What are the pros and cons of various approaches?

if I use at least once in Kafka and stats deviate from the ground truth, what are the ways to correct the stats regularly?

like image 686
raju Avatar asked Nov 07 '22 06:11

raju


1 Answers

If your Kafka messages have a proper timestamp, then you can get the offsets for the timestamp of the previous week. So you can use..

Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)

The documentation says:

Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.

For getting the list of topic partitions, you can call consumer.assignment() (after subscribe() or assign()) which returns Set<TopicPartition> assigned to the consumer. The Long value in the map is basically, the timestamp. So, for all keys in your case, it will be the same value (i.e. 1 week old timestamp)

Now, that you have got a Map<TopicPartition, OffsetAndTimestamp>. You can now use seek(TopicPartition partition, long offset) to seek to each offset.

consumer.subscribe(topics);
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> map = new LinkedHashMap<>();
partitions.forEach(partition -> map.put(partition, oneWeekOldTimestamp));
Map<TopicPartition, OffsetAndTimestamp> offsetsMap = consumer.offsetForTimes(map);
offsetsMap.forEach((partition, offsetTimestamp) -> consumer.seek(partition, offsetTimestamp.offset()));

Now, your consumer will be at position of messages that have been one week old. So, when you poll(), you poll from last week till now.

You can change your timestamp to meet your requirements, for example, anything older than 1 week means, from timestamp 0 to last week timestamp.

All previous week data means, 2weekOldTimestamp - 1weekOldTimestamp.

So, in this case you have to seek to 2weekOldTimestamp and then process each partition till you encounter 1weekOldTimestamp

like image 128
JavaTechnical Avatar answered Nov 09 '22 13:11

JavaTechnical