Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink Streaming - apply function in windows

I am new to flink and streaming as well. I want to apply a certain function per partition to each window of the stream (event time is used). What I have done so far is this:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val inputStream = env.readTextFile("dataset.txt")
      .map(transformStream(_))
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.id)
      .timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep))

def transformStream(input: String): EventStream = {...}

case class EventStream(val eventTime: Long, val id: String, actualEvent: String)

What I want to do is to apply a general function to each partition per window batch, maybe apply a complex processing algorithm or something similar. I have seen the method apply in the DataStream API but I did not understand how it works. In Flink API it says it is used like that in Scala:

inputStream.apply { WindowFunction }

Can someone explain what the apply method does or how it is used? An example in Scala would be preferrable. Does the apply method do what I want?

like image 608
Al Jenssen Avatar asked Mar 11 '23 00:03

Al Jenssen


1 Answers

So basically there are two possible directions to follow based on the type of calculations you would like to do. Either use: fold/reduce/aggregate or more generic one, you already mentioned -apply. All of them apply to windows for a key.

As for the apply it is a very generic way for applying computations. The most basic version (in Scala) would be:

def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R] 

where function takes 4 parameters:

  • key of the window(remember you are working on keyedStream)
  • window (you can extract e.g start or end of the window from it)
  • the elements that were assigned to this particular window and key
  • a collector to which you should emit results of your processing

One must remember though that this version have to keep every element in state until the window is emitted. A better memory performant solution would be to use a version with preAgreggator which performs some calculations before firing the function described above.

Here you can see a short snippet with the preaggregated:

val stream: DataStream[(String,Int)] =   ...

stream.keyBy(_._1)
      .window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap())))
      .apply((e1, e2) => (e1._1, e1._2 + e2._2),
             (key, window, in, out: Collector[(String, Long, Long, Int)]) => {
                out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum))
      })

which counts appearences of a key in session windows.

So basically if you don't need the meta information of a window I would stick to the fold \ reduce \ aggregate if they are sufficient. Than consider apply with some kind of preaggregations and if that is not enough take a look at the most generic apply.

For more complete examples you may take a look here.

like image 155
Dawid Wysakowicz Avatar answered Mar 19 '23 13:03

Dawid Wysakowicz