Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink streaming event time window ordering

I'm running into some troubles understanding the semantics around event time windowing. The following program generates some tuples with timestamps that are used as event time and does a simple window aggregation. I would expect the output to be in the same order as the input, but the output is ordered differently. Why is the output out of order with respect to event time?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)

    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))

    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()

    env.execute()
}

The input:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)

Result:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
like image 484
bandrews Avatar asked Dec 08 '15 19:12

bandrews


People also ask

Is Flink a real-time streaming system?

Flink capabilities enable real-time insights from streaming data and event-based capabilities. Flink enables real-time data analytics on streaming data and fits well for continuous Extract-transform-load (ETL) pipelines on streaming data and for event-driven applications as well.

What is Flink windowing?

Windowing is an approach to break the data stream into mini-batches or finite streams to apply different transformations on it. Flink window opens when the first data element arrives and closes when it meets our criteria to close a window. It can be based on time, count of messages or a more complex condition.

Is Flink better than Storm?

Conceptual differencesStorm and Flink can process unbounded data streams in real-time with low latency. Storm uses tuples, spouts, and bolts that construct its stream processing topology. For Flink, you need sources, operators, and sinks to process events within its data pipeline.

How does Flink streaming work?

Flink is designed to run stateful streaming applications at any scale. Applications are parallelized into possibly thousands of tasks that are distributed and concurrently executed in a cluster. Therefore, an application can leverage virtually unlimited amounts of CPUs, main memory, disk and network IO.


1 Answers

The reason for this behavior is that in Flink the ordering of elements (with respect to the timestamp) is not taken into account. Only the correctness of watermarks and their relation to the timestamps of elements is important for operations that consider time because the watermarks normally trigger computation in time-based operations.

In your example, the window operator stores all the elements from the source in internal window buffers. Then, the source emits a watermark that says that no elements with a smaller timestamp will arrive in the future. This, in turn, tells the window operator to process all windows with end-timestamps that are below the watermarks (which is true for all windows). Thus, it emits all the windows (with arbitrary ordering) and after that it emits a watermark itself. Operations downstream from this will themselves receive the elements and can do processing once they receive watermarks.

Per default, the interval at which watermarks are emitted from sources is 200 ms. With the small amount of elements that your source emits all of them are emitted before the first watermark is emitted. In a real-world use case, where the watermark emission intervals is a lot smaller than then window size you would get the expected behavior of windows being emitted in the order of their timestamp. For example, if you have 1 hour windows and watermarks every 500 ms.

like image 193
aljoscha Avatar answered Nov 16 '22 14:11

aljoscha