Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

apache flink session support

I am investigating Apache Flink Streaming to be used in our platform for ETL and machine learning. The thing i haven't figured out yet is how to stream events into 'sessions'. More descriptive: all events do contain a session id and in order to enrich data I need to group all events belonging to a session together. Please take into account that events are continuously stream in (thus no batch support after which you could simply do a groupBy e.g.)

One possible solution could be to maintain a LRU cache of sessions and sort all incoming events to their associated session. Then after X minutes of inactivity per session one can 'close' or evict the session from the cache. The question is how to deal with this cache in a multi tenant system; does flink has the concept of a distributed cache or does it contain some sort of smart load balancer where events are directed to the same partition in the grid?

More generally: What's the best way (use cases and pitfalls) to establish session support using the streaming api? Is this possible at all? And how to deal with replaying streams? (i.e. start from a particular point-in-time where events stream in of incomplete sessions (i.e. with events before point-in-time)

Interested in any feedback, ideas and/or pointers.

Thanks in advance

like image 878
Leonard Wolters Avatar asked Apr 08 '26 21:04

Leonard Wolters


1 Answers

I created an example that could be pretty close to what you require: https://gist.github.com/aljoscha/91b6422114eac814479f

I uses a Tuple2<Integer,String> to simulate data. The integer is the session ID while the String is some field on which we key (partition) the data.

I suggest you first take a look at the main() method, there you see the flow of the program. The other bits are the custom window definition SessionWindow a window assigner and the SessionTrigger. This basically implements the idea of the cache that you suggested. The windows are kept in buffers based on the assigned windows and the key. Once the Trigger fires we process the window and the contents are evicted.

When the trigger receives an element it registers a timer for 10 seconds in the future. If no new element arrives by that time the trigger will fire. If a new element arrives within that time window it will register a new timer, this will replace the old timer since triggers can only have one active timer at a time.

Also, this uses what is called processing-time windows. This can also be changed to to the triggering based on the event-time, i.e. the timestamp of your elements.

like image 163
aljoscha Avatar answered Apr 10 '26 12:04

aljoscha



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!