Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Beam: why is the timestamp of aggregate value in Global Window 9223371950454775?

We migrated from Google Dataflow 1.9 to Apache Beam 0.6. We are noticing a change in the behavior to the timestamps after applying the globalwindow. In Google Dataflow 1.9, we would get the correct timestamps in the DoFn after windowing/combine function. Now we get some huge value for the timestamp e.g. 9223371950454775, Did the default behavior for the globalwindow change in Apache Beam version?

input.apply(name(id, "Assign To Shard"), ParDo.of(new AssignToTest()))
      .apply(name(id, "Window"), Window
          .<KV<Long, ObjectNode >>into(new GlobalWindows())
          .triggering(Repeatedly.forever(
              AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardMinutes(1))))
          .discardingFiredPanes())
      .apply(name(id, "Group By Shard"), GroupByKey.create())
      .appy(.....) }
like image 536
nokostar Avatar asked Mar 09 '23 13:03

nokostar


1 Answers

TL;DR: When you are combining a bunch of timestamped values, you need to choose a timestamp for the result of the aggregation. There are multiple good answers for this output timestamp. In Dataflow 1.x the default was the minimum of the input timestamps. Based on our experience with 1.x in Beam the default was changed to the end of the window. You can restore the prior behavior by adding .withTimestampCombiner(TimestampCombiner.EARLIEST) to your Window transform.


I'll unpack this. Let's use the @ sign to pair up a value and its timestamp. Focusing on just one key, you have timestamped values v1@t1, v2@t2, ..., etc. I will stick with your example of a raw GroupByKey even though this also applies to other ways of combining the values. So the output iterable of values is [v1, v2, ...] in arbitrary order.

Here are some possibilities for the timestamp:

  • min(t1, t2, ...)
  • max(t1, t2, ...)
  • the end of the window these elements are in (ignoring input timestamps)

All of these are correct. These are all available as options for your OutputTimeFn in Dataflow 1.x and TimestampCombiner in Apache Beam.

The timestamps have different interpretations and they are useful for different things. The output time of the aggregated value governs the downstream watermark. So choosing earlier timestamps holds the downstream watermark more, while later timestamps allows it to move ahead.

  • min(t1, t2, ...) allows you to unpack the iterable and re-output v1@t1
  • max(t1, t2, ...) accurately models the logical time that the aggregated value was fully available. Max does tend to be the most expensive, for reasons to do with implementation details.
  • end of the window:
    • models the fact that this aggregation represents all the data for the window
    • is very easy to understand
    • allows downstream watermarks to advance as fast as possible
    • is extremely efficient

For all of these reasons, we switched the default from the min to end of window.

In Beam, you can restore the prior behavior by adding .withTimestampCombiner(TimestampCombiner.EARLIEST) to your Window transform. In Dataflow 1.x you can migrate to Beam's defaults by adding .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()).

Another technicality is that the user-defined OutputTimeFn is removed and replaced by the TimestampCombiner enum, so there are only these three choices, not a whole API to write your own.

like image 101
Kenn Knowles Avatar answered Mar 11 '23 03:03

Kenn Knowles