I'm stuck trying to coding custom logic for the Window's apply() method. Basically I want to reduce all elements from a Window, and then appending a timestamp to that value, so I created a WindowedStream from a DataStream, but when I try to define the functions for the apply() it fails at compile time.
This is the code:
class WindowReduceFunction extends ReduceFunction[(Int, String, Int)] {
override def reduce(a: (Int, String, Int), b: (Int, String, Int)) : (Int, String, Int) = {
(a._1, a._2, a._3 + b._3)
}
}
class WindowTimestampAddFunction extends WindowFunction[(Int, String, Int), (Int, String, Int, Long), (Int, String), TimeWindow] {
override def apply(key : (Int, String), window : Window, in: Iterable[(Int, String, Int)], out: Collector[(Int, String, Int, Long)]) {
for(row <- in) {
out.collect((row._1, row._2, row._3, window.maxTimestamp()))
}
}
}
The DataStream is of type [Int, String, Int] and the key is [Int, String]. The code without apply() runs and compile without errors, but is when I type:
myWindowedStream.apply(new WindowReduceFunction(), new WindowTimestampAddFunction())
When it fails and can't compile, giving the error:
[ERROR] [R](preAggregator: ((Int, String, Int), (Int, String, Int)) => (Int, String, Int), windowFunction: (org.apache.flink.api.java.tuple.Tuple, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[(Int, String, Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$6: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
[ERROR] [R](preAggregator: org.apache.flink.api.common.functions.ReduceFunction[(Int, String, Int)], function: org.apache.flink.streaming.api.scala.function.WindowFunction[(Int, String, Int),R,org.apache.flink.api.java.tuple.Tuple,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
[ERROR] cannot be applied to (WindowReduceFunction, WindowTimestampAddFunction)
[ERROR] .apply(new WindowReduceFunction(), new WindowTimestampAddFunction())
[ERROR] ^
[ERROR] one error found
You are using either an index position key, as in keyBy(1) or a field expression key as in keyBy("field"). This means that the key type of the WindowedStream is type Tuple (org.apache.flink.api.java.tuple.Tuple to be specific).
If you change the third generic argument of your WindowFunction to Tuple from (Int, String) it should work. You can also change your keyBy call to use a lambda function, then you can get the correct specific key type in your WindowedStream. For example: keyBy( in => (in._1, in._2).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With