We are attempting to use fixed windows on an Apache Beam pipeline (using DirectRunner
). Our flow is as follows:
CombineFn
, combine each window of Event
s into a List<Event>
List<Event>
Pipeline code:
pipeline
// Read from pubsub topic to create unbounded PCollection
.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
// Deserialize JSON into Event object
.apply("ParseEvent", ParDo
.of(new ParseEventFn())
)
// Window events with a fixed window size of 5 seconds
.apply("Window", Window
.<Event>into(FixedWindows
.of(Duration.standardSeconds(5))
)
)
// Group events by window
.apply("CombineEvents", Combine
.globally(new CombineEventsFn())
.withoutDefaults()
)
// Log grouped events
.apply("LogEvent", ParDo
.of(new LogEventFn())
);
The result we are seeing is that the final step is never run, as we don't get any logging.
Also, we have added System.out.println("***")
in each method of our custom CombineFn
class, in order to track when these are run, and it seems they don't run either.
Is windowing set up incorrectly here? We followed an example found at https://beam.apache.org/documentation/programming-guide/#windowing and it seems fairly straightforward, but clearly there is something fundamental missing.
Any insight is appreciated - thanks in advance!
Looks like the main issue was indeed a missing trigger - the window was opening and there was nothing telling it when to emit results. We wanted to simply window based on processing time (not event time) and so did the following:
.apply("Window", Window
.<Event>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
Essentially this creates a global window, which is triggered to emit events 5 seconds after the first element is processed. Every time the window is closed, another is opened once it receives an element. Beam complained when we didn't have the withAllowedLateness
piece - as far as I know this just tells it to ignore any late data.
My understanding may be a bit off the mark here, but the above snippet has solved our problem!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With