What can be the setup if I want to process the logs which are exactly 1 week old from Kafka?
Usecase is that I maintain cumulative stats of the last 1 week's user activity. I am fine with eventual consistency and don't need the stats to be for exactly 1 week.
I have a streaming setup which processes the incoming logs from Kafka and updates the stats. Any activity older than 1 week should be deleted from the stats. One of the ways I can achieve is using a batch process (eg Spark) to remove activity older than 1 week from the stats.
Is there any way I can use stream processing to remove user activity older than 1 week from stats? What are the pros and cons of various approaches?
if I use at least once in Kafka and stats deviate from the ground truth, what are the ways to correct the stats regularly?
If your Kafka messages have a proper timestamp, then you can get the offsets for the timestamp of the previous week. So you can use..
Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
The documentation says:
Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
For getting the list of topic partitions, you can call consumer.assignment()
(after subscribe()
or assign()
) which returns Set<TopicPartition>
assigned to the consumer. The Long
value in the map is basically, the timestamp. So, for all keys in your case, it will be the same value (i.e. 1 week old timestamp)
Now, that you have got a Map<TopicPartition, OffsetAndTimestamp>
. You can now use seek(TopicPartition partition, long offset)
to seek to each offset.
consumer.subscribe(topics);
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> map = new LinkedHashMap<>();
partitions.forEach(partition -> map.put(partition, oneWeekOldTimestamp));
Map<TopicPartition, OffsetAndTimestamp> offsetsMap = consumer.offsetForTimes(map);
offsetsMap.forEach((partition, offsetTimestamp) -> consumer.seek(partition, offsetTimestamp.offset()));
Now, your consumer will be at position of messages that have been one week old. So, when you poll()
, you poll from last week till now.
You can change your timestamp to meet your requirements, for example, anything older than 1 week means, from timestamp 0 to last week timestamp.
All previous week data means, 2weekOldTimestamp - 1weekOldTimestamp
.
So, in this case you have to seek to 2weekOldTimestamp
and then process each partition till you encounter 1weekOldTimestamp
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With