Let’s say we have a TumblingEventTimeWindow with size 5 minutes. And we have events containing 2 basic pieces of information:
In this example, we kick off our Flink topology at 12:00 PM worker machines’ wall clock time (of course workers can have out of sync clocks but that’s out of the scope of this question). This topology contains one processing operator whose responsibility is to sum up the values of events belonging to each window and a KAFKA Sink which is irrelevant with regard to this question.
In this scenario several events arrive at the Flink Operator with different event timestamps spanning 12:01 - 12:09
. Also, the event timestamps are relatively aligned with our processing time (shown in the X axis below). Since we're dealing with EVENT_TIME characteristic, whether or not an even belongs to a particular event should be determined via its event timestamp.
enter image description here
In that flow I have assumed the boundaries of the two tumbling windows are 12:00 -- 12:05
and 12:05 -- 12:10
just because we have kicked off the execution of the topology at 12:00. If that assumption is correct (I hope not), then what happens in case of a back-filling situation in which several old events coming in with much older event timestamps and we have kicked off the topology at 12:00 again? (old enough that our lateness allowance does not cover them). Something like the following:
enter image description here
The answer to the previous question will address this as well, but I think it would be helpful to explicitly mention it here. Let's say I have this TumblingEventTimeWindow of size 5 minutes. Then at 12:00 I kick off a backfilling job which rushes in many events to the Flink operator whose timestamps cover the range 10:02 - 10:59
; but since this is a backfilling job, the whole execution takes about 3 minutes to finish.
Will the job allocate 12 separate windows and populate them correctly based on the events' event timestamps? What would be the boundaries of those 12 windows? And will I end up with 12 output events each of which having the summed up value of each allocated window?
I also have some concerns regarding automated testing of such logic and operators. Best way to manipulate processing time, trigger certain behaviors in such a way that shape desired windows' boundaries for testing purposes. Specially since the stuff that I've read so far on leveraging Test Harnesses
seem a bit confusing and can cause some cluttered code potentially which is not that easy to read:
Most of what I've learned in this area and the source of some of my confusion can be found in the following places:
Tumbling
, Sliding
, Session
and more)?!HUGE thanks in advance for your help and if you know about any better references with regard to these concepts and their internals working, please let me know.
If you run a Job with event time semantics, the processing time at the window operators is completely irrelevant
That is correct and I understand that part. Once you're dealing with EVENT_TIME
characteristics, you're pretty much divorced from processing time in your semantics/logic. The reason I brought up the processing time was my confusion with regard to the following key question which still is a mystery to me:
How does the windows' boundaries are computed?!
Also, thanks a lot for clarifying the distinction between out-of-orderness
and lateness
. The code I was dealing with totally threw me off by having a misnomer (the constructor argument to a class inheriting from BoundedOutOfOrdernessTimestampExtractor
was named maxLatency
) :/
With that in mind, let me see if I can get this correct with regard to how watermark is computed and when an event will be discarded (or side-outputted):
max-event-time-seen-so-far - max-out-of-orderness-allowed
max-event-time-seen-so-far - allowed-lateness
max-event-time-seen-so-far
And in any of these cases, whatever event whose event timestamp is less than or equal to the current-watermark
, will be discarded (side-outputted), correct?!
And this brings up a new question. When would you wanna use out of orderness
as opposed to lateness
? Since the current watermark computation (mathematically) can be identical in these cases. And what happens when you use both (does that even make sense)?!
This is still the main mystery to me. Given all the discussion above, let'e revisit the concrete example I provided and see how the windows' boundaries are determined here. Let's say we have the following scenario (events are in the shape of (value, timestamp)
):
DataStream
with BoundedOutOfOrdernessTimestampExtractor
which has 2 minute maxOutOfOrderness
allowedLateness
of 1 minute
NOTE: If you cannot have both out of orderness
and lateness
or does not make sense, please only consider the out of orderness
in the example above.
Finally, can you please layout the windows which will have some events allocated to them and, please specify the boundaries of those windows (beginning and end timestamps of the window). I'm assuming the boundaries are determined by events' timestamps as well but it's a bit tricky to figure them out in concrete examples like this one.
Again, HUGE thanks in advance and truly appreciate your help :)
Watermark: To my understanding, watermark in Flink and Spark Structured Stream is defined as (max-event-timestamp-seen-so-far - allowed-lateness). Any event whose event timestamp is less than or equal to this watermark will be discarded and ignored in result computations.
This is not correct and might be the source of the confusion. Out-of-Orderness and Lateness are different concepts in Flink. With the BoundedOutOfOrdernessTimestampExtractor
the watermark is max-event-timestamp-seen-so-far - max-out-of-orderness
. More about Allowed Lateness in the Flink Documentation [1].
If you run a Job with event time semantics, the processing time at the window operators is completely irrelevant:
window end time -1
).current watermark - allowed lateness
are discarded or sent to the late data side output [1]This means, if you start a job at 12:00pm (processing time) and start ingesting data from the past, the watermark will also be (even further) in the past. So, the configured allowedLateness
is irrelevant, because the data is not late with respect to even time.
On the other hand, if you first ingest some data from 12:00pm and afterwards data from 10:00pm, the watermark will have already advanced to ~12:00pm before you ingest the old data. In this case the data from 10:00pm will be "late". If it is later than the configured allowedLateness
(default=0) it is discarded (default) or sent to a side output (if configured) [1].
The timeline for an event time window is the following:
watermark >= window_endtime - 1
arrives -> window is fired (results are emitted), but state is not discardedwatermark >= window_endtime + allowed_latenes
arrives -> state is discardedBetween 2. and 3. events for this window are late, but within the allowed lateness. The events are added to the existing state and - per default - the window is fired on each record emitting a refined result.
After 3. events for this window will be discarded (or sent to the late output sink).
So, yes, it makes sense to configure both. The out of orderness determines, when the window is fired for the first time, while the allowed lateness determines how long the state is kept around to potentially update the results.
Regarding the boundaries: tumbling event time windows have a fixed length, are aligned across keys and start at the unix epoch. Empty windows, don't exist. For your example this means:
Hope this helps.
Konstantin
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#allowed-lateness
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With