I have data in the following format,
SIP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|INVITE RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|0 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|1 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|2 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|3 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|4 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|5 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|6 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|7 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|8 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|9 SIP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016|BYE
I want my window to start when a SIP-INVITE
message is encountered and trigger an event when a SIP-BYE
message is encountered, performing some aggregations.
How do I do this? The SIP-INVITE
message comes at any point in time for a given user, and I might also have multiple SIP-INVITE
messages for multiple users coming at the same time.
I think you can solve your use case with global windows keyed by user. Global windows collect all data per key and push the responsibility of triggering and purging the window to a user-defined Trigger
function.
A global window is defined as follows:
val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker)
val agg = input
// one global window per user (handles overlapping SIP-INVITE events).
.keyBy(_._1)
// collect all data for each user until the trigger fires and purges the window.
.window(GlobalWindows.create())
// you have to implement a custom trigger which reacts on the marker.
.trigger(new YourCustomTrigger())
// the window function computes your aggregation.
.apply(new YourWindowFunction())
I think a trigger which does the following should work (assuming that a SIP-INVITE
event is always starting a session). The Trigger.onElement()
method should check for the SIP-BYE
field and trigger a window evaluation and purge the window, i.e., return TriggerResult.FIRE_AND_PURGE
. This will call the evaluation function and remove the window state.
Note, special attention is required if you want to support out-of-order events (in this case you should set an event-time timer to the timestamp of closing element to ensure that all data previous to the timestamp is received). If there is data that should be discarded because it is not "between" SIP-INVITE
and SIP-BYE
you need to handle that as well.
For details see the documentation of global windows and triggers, the JavaDocs of [Trigger][3]
, and this blog post.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With