Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SlidingWindows for slow data (big intervals) on Apache Beam

I am working with Chicago Traffic Tracker dataset, where new data is published every 15 minutes. When new data is available, it represents records off by 10-15 minutes from the "real time" (example, look for _last_updt).

For example, at 00:20, I get data timestamped 00:10; at 00:35, I get from 00:20; at 00:50, I get from 00:40. So the interval that I can get new data "fixed" (every 15 minutes), although the interval on timestamps change slightly.

I am trying to consume this data on Dataflow (Apache Beam) and for that I am playing with Sliding Windows. My idea is to collect and work on 4 consecutive datapoints (4 x 15min = 60min), and ideally update my calculation of sum/averages as soon as a new datapoint is available. For that, I've started with the code:

PCollection<TrafficData> trafficData = input        
    .apply("MapIntoSlidingWindows", Window.<TrafficData>into(
        SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
            .every(Duration.standardMinutes(15))) .     // interval to get new data
        .triggering(AfterWatermark
                        .pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))
        .withAllowedLateness(Duration.ZERO)
        .accumulatingFiredPanes());

Unfortunately, looks like when I receive a new datapoint from my input, I do not get a new (updated) result from the GroupByKey that I have after.

Is this something wrong with my SlidingWindows? Or am I missing something else?

like image 235
tyron Avatar asked May 29 '18 07:05

tyron


People also ask

What is beam DoFn?

DoFn is a Beam SDK class that describes a distributed processing function.

What is CoGroupByKey used for?

Aggregates all input elements by their key and allows downstream processing to consume all values associated with the key. While GroupByKey performs this operation over a single input collection and thus a single type of input values, CoGroupByKey operates over multiple input collections.

What is PCollection in Apache Beam?

A PCollection<T> is an immutable collection of values of type T . A PCollection can contain either a bounded or unbounded number of elements.


1 Answers

One issue may be that the watermark is going past the end of the window, and dropping all later elements. You may try giving a few minutes after the watermark passes:

PCollection<TrafficData> trafficData = input        
    .apply("MapIntoSlidingWindows", Window.<TrafficData>into(
        SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
            .every(Duration.standardMinutes(15))) .     // interval to get new data
        .triggering(AfterWatermark
                        .pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
                        .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
        .withAllowedLateness(Duration.standardMinutes(15))
        .accumulatingFiredPanes());

Let me know if this helps at all.

like image 124
Pablo Avatar answered Nov 03 '22 18:11

Pablo