On a keyed stream, I would like to compute a window function once per new incoming event, immediately as soon as the new event arrives, while providing it with the context of all earlier events of this key from within the last 30 days as an iterator.
The expected behavior is similar to that of a sliding window with 30 days length and 1 nanosecond slide, computing the window function only once per incoming event.
I cannot see how to map this behavior on top of the built-in Tumbling/Sliding/Session windows with/without Triggers/Evictors, etc.
Can anybody help? Or does this require to write my own Window Assigner or my own keyed state handling?
You are right, it is not easy to model your use case with the window primitives that Flink provides.
The best solution I can think of is to implement a custom operator (OneInputStreamOperator
). This is a fairly low-level interface which has access to record timestamps, watermarks, and state (many of Flink's built-in operators are based on that interface). When a new record is received, the operator would put it into a priority queue which is ordered by timestamp, remove all elements earlier than 30 days, and evaluate the function over the remaining elements in the queue.
Note, the queue should be registered as managed state to make the operator fault-tolerant. If you want to use event time, you can only do the evaluate and drop data when a watermark is received.
When implementing the OneInputStreamOperator
interface, it might help to peek at Flink's built-in operators, such as StreamFilter
or one of the more complex ones.
The custom operator can be applied on a DataStream
or KeyedStream
(obtained by DataStream.keyBy()
) using the transform()
method.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With