I have a stream of data that is keyed and need to compute counts for tumbled of different time periods (1 minute, 5 minutes, 1 day, 1 week).
Is it possible to compute all four window counts in a single application?
Yes, that's possible.
If you are using event-time, you can simply cascade the windows with increasing time intervals. So you do:
DataStream<String> data = ...
// append a Long 1 to each record to count it.
DataStream<Tuple2<String, Long>> withOnes = data.map(new AppendOne);
DataStream<Tuple2<String, Long>> 1minCnts = withOnes
// key by String field
.keyBy(0)
// define time window
.timeWindow(Time.of(1, MINUTES))
// sum ones of the Long field
// in practice you want to use an incrementally aggregating ReduceFunction and
// a WindowFunction to extract the start/end timestamp of the window
.sum(1);
// emit 1-min counts to wherever you need it
1minCnts.addSink(new YourSink());
// compute 5-min counts based on 1-min counts
DataStream<Tuple2<String, Long>> 5minCnts = 1minCnts
// key by String field
.keyBy(0)
// define time window of 5 minutes
.timeWindow(Time.of(5, MINUTES))
// sum the 1-minute counts in the Long field
.sum(1);
// emit 5-min counts to wherever you need it
5minCnts.addSink(new YourSink());
// continue with 1 day window and 1 week window
Note that this is possible, because:
Regarding the comment on the incrementally aggregating ReduceFunction
:
Usually, you want to have the start and/or end timestamp of the window in the output of a window operation (otherwise all results for the same key look the same). The start and end time of a window can be accessed from the window
parameter of the apply()
method of a WindowFunction
. However, a WindowFunction
does not incrementally aggregate records but collects them and aggregates the records at the end of the window. Hence, it is more efficient to use a ReduceFunction
for incremental aggregation and a WindowFunction
to append the start and/or end time of the window to the result. The documentation discusses the details.
If you want to compute this using processing time, you cannot cascade the windows but have to fan out from the input data stream to four window functions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With