Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink Streaming: How to implement windows which are defined by a start and end element?

I have data in the following format,

SIP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|INVITE RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|0 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|1 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|2 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|3 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|4 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|5 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|6 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|7 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|8 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|9 SIP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|BYE

I want my window to start when a SIP-INVITE message is encountered and trigger an event when a SIP-BYE message is encountered, performing some aggregations.

How do I do this? The SIP-INVITE message comes at any point in time for a given user, and I might also have multiple SIP-INVITE messages for multiple users coming at the same time.

like image 408
Priya Ravichander Avatar asked Nov 09 '16 06:11

Priya Ravichander


1 Answers

I think you can solve your use case with global windows keyed by user. Global windows collect all data per key and push the responsibility of triggering and purging the window to a user-defined Trigger function.

A global window is defined as follows:

val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker)
val agg = input
  // one global window per user (handles overlapping SIP-INVITE events).
  .keyBy(_._1)
  // collect all data for each user until the trigger fires and purges the window.
  .window(GlobalWindows.create())
  // you have to implement a custom trigger which reacts on the marker.
  .trigger(new YourCustomTrigger())
  // the window function computes your aggregation.
  .apply(new YourWindowFunction())

I think a trigger which does the following should work (assuming that a SIP-INVITE event is always starting a session). The Trigger.onElement() method should check for the SIP-BYE field and trigger a window evaluation and purge the window, i.e., return TriggerResult.FIRE_AND_PURGE. This will call the evaluation function and remove the window state.

Note, special attention is required if you want to support out-of-order events (in this case you should set an event-time timer to the timestamp of closing element to ensure that all data previous to the timestamp is received). If there is data that should be discarded because it is not "between" SIP-INVITE and SIP-BYE you need to handle that as well.

For details see the documentation of global windows and triggers, the JavaDocs of [Trigger][3], and this blog post.

like image 98
Fabian Hueske Avatar answered Oct 13 '22 10:10

Fabian Hueske