We are using Kafka stream's SessionWindows to aggregate arrival of related events. Also along with the aggregation we are specifying the retention time for the window using until()
API.
Stream info:
The session window (inactivity time) is 1 minute and the retention time passed to until()
is 2 minutes.
We are using customized TimestampExtractor
to map event's time.
Example:
Event: e1; eventTime: 10:00:00 am; arrivalTime:2pm(same day)
Event: e2; eventTime: 10:00:30 am; arrivalTime 2:10 pm (same day)
The arrival time for the second event is 10 minutes after the arrival of e1 which exceeds retention time + inactivity time. But older event e1 is still part of the aggregation despite the retention time being 2 mins.
Questions:
1) How does kafka streams clean up state store using until()
API? Since the retention value specified as an argument is "lower bound for how long a window will be maintained." When exactly the window is purged?
2) Is there a background thread that cleans up the state store periodically? If yes, then is there a way to identify the actual time when the window is purged.
3) Any stream configuration that would purge the data for a window after retention time.
Kafka Streams builds on fault-tolerance capabilities integrated natively within Kafka. Kafka partitions are highly available and replicated; so when stream data is persisted to Kafka it is available even if the application fails and needs to re-process it.
« Kafka Summit Americas 2021. The grace period is a parameter of windowed operations such as Window or Session aggregates, or stream-stream joins. This configuration determines how long after a window ends any new data will still be processed.
As point 1 if having just a producer producing message we don't need Kafka Stream. If consumer messages from one Kafka cluster but publish to different Kafka cluster topics. In that case, you can even use Kafka Stream but have to use a separate Producer to publish messages to different clusters.
and each jvm machine can host upto 15 kafka-stream apps.
Before I answer your concrete question: Note, that retention time is not based on system time, but on "stream time". "Stream time" is an internally tracked time progress based on whatever TimestampExtractor
returns. Without going into too much detail: for your example with 2 records, "stream time" will be advance by 30 seconds when the second record arrives and thus retention time did not pass yet.
Also note that "stream time" is not advance if no new data arrives (for at least one partition). This holds for Kafka 0.11.0 and older but might change in future releases.
Update: The computation of stream-time was changed in Kafka 2.1 and stream-time may advance even if one partition does not deliver data. For details see KIP-353: Improve Kafka Streams Timestamp Synchronization
To your questions:
(1) Kafka Streams writes all store update into a changelog topic and a local RocksDB store. Both a divided into so-called segments with certain size. If new data arrives (ie, "stream time" progresses) new segments are created. If this happens, older segment are deleted iff all records in an old segment are older than retention time (ie, record timestamp smaller than "stream time" minus retention time).
(2) Thus, there is no background thread but cleanup is part of regular processing,
and (3) there is no configuration to force purging of older records/windows.
As whole segments are dropped if all record are expired, the older records within a segment (with most likely smaller/older timestamps) are maintained longer than retention time. The motivation behind this design is performance: expiring on a per-record basis would be too expensive.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With