I am trying to use a ProcessWindowFunction in my Apache Flink project using Scala. Unfortunately, I already fail at implementing a basic ProcessWindowFunction like it is used in the Apache Flink Documentation.
This is my code:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector.{NgsiEvent, OrionSource}
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.util.Collector
import scala.collection.TraversableOnce
object StreamingJob {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val eventStream = env.addSource(new OrionSource(9001))
val processedDataStream = eventStream.flatMap(event => event.entities)
.map(entity => (entity.id, entity.attrs("temperature").value.asInstanceOf[String]))
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new MyProcessWindowFunction())
env.execute("Socket Window NgsiEvent")
}
}
private class MyProcessWindowFunction extends ProcessWindowFunction[(String, String), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, String)], out: Collector[String]): Unit = {
var count: Int = 0
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
From IntelliJ I get the following hints:
1) This is shown where the new class object is created:
Type mismatch, expected: ProcessWindowFunction[(String, String), NotInferedR, String, TimeWindow], actual: MyProcessWindowFunction
2) This is shown directly at the class:
Class 'MyProcessWindowFunction' must either be declared abstract or implement abstract member 'process(key:KEY, context:ProcessWindowFunction.Context, iterable:Iterable<IN>, collector:Collector<OUT>):void' in 'org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction'
Building the code shows me the following error:
Error:(51, 16) type mismatch;
found : org.apache.flink.MyProcessWindowFunction
required:
org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[(String, String),?,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
.process(new MyProcessWindowFunction())
I am grateful for every help.
This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version . The ProcessFunction is a low-level stream processing operation, giving access to the basic building blocks of all (acyclic) streaming applications: timers (event time and processing time, only on keyed stream)
A WindowAssigner is responsible for assigning each incoming element to one or more windows. Flink comes with pre-defined window assigners for the most common use cases, namely tumbling windows , sliding windows, session windows and global windows. You can also implement a custom window assigner by extending the WindowAssigner class.
When working with event-time windowing, it can happen that elements arrive late, i.e. the watermark that Flink uses to keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See event time and especially late elements for a more thorough discussion of how Flink deals with event time.
When watermarks arrive at the window operator this triggers two things: the watermark triggers computation of all windows where the maximum timestamp (which is end-timestamp - 1) is smaller than the new watermark
After spending some time debugging with 2 more people we finally managed to find the problem.
In my code I used the following import:
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
But the correct import when using Scala seems to be:
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
//package of ProcessWindowFunction is
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
//The correct way to call this method
new MyProcessWindowFunction()[(String, String), String, String, TimeWindow]
//I know the official documents don't.This may be a bug
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With