We migrated from Google Dataflow 1.9 to Apache Beam 0.6. We are noticing a change in the behavior to the timestamps after applying the globalwindow. In Google Dataflow 1.9, we would get the correct timestamps in the DoFn after windowing/combine function. Now we get some huge value for the timestamp e.g. 9223371950454775, Did the default behavior for the globalwindow change in Apache Beam version?
input.apply(name(id, "Assign To Shard"), ParDo.of(new AssignToTest()))
.apply(name(id, "Window"), Window
.<KV<Long, ObjectNode >>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.discardingFiredPanes())
.apply(name(id, "Group By Shard"), GroupByKey.create())
.appy(.....) }
TL;DR: When you are combining a bunch of timestamped values, you need to choose a timestamp for the result of the aggregation. There are multiple good answers for this output timestamp. In Dataflow 1.x the default was the minimum of the input timestamps. Based on our experience with 1.x in Beam the default was changed to the end of the window. You can restore the prior behavior by adding .withTimestampCombiner(TimestampCombiner.EARLIEST)
to your Window
transform.
I'll unpack this. Let's use the @ sign to pair up a value and its timestamp. Focusing on just one key, you have timestamped values v1@t1, v2@t2, ..., etc. I will stick with your example of a raw GroupByKey
even though this also applies to other ways of combining the values. So the output iterable of values is [v1, v2, ...] in arbitrary order.
Here are some possibilities for the timestamp:
All of these are correct. These are all available as options for your OutputTimeFn
in Dataflow 1.x and TimestampCombiner
in Apache Beam.
The timestamps have different interpretations and they are useful for different things. The output time of the aggregated value governs the downstream watermark. So choosing earlier timestamps holds the downstream watermark more, while later timestamps allows it to move ahead.
For all of these reasons, we switched the default from the min to end of window.
In Beam, you can restore the prior behavior by adding .withTimestampCombiner(TimestampCombiner.EARLIEST)
to your Window
transform. In Dataflow 1.x you can migrate to Beam's defaults by adding .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())
.
Another technicality is that the user-defined OutputTimeFn
is removed and replaced by the TimestampCombiner
enum, so there are only these three choices, not a whole API to write your own.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With