Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink window state size and state management

After reading flink's documentation and searching around, i couldn't entirely understand how flink's handles state in its windows. Lets say i have an hourly tumbling window with an aggregation function that accumulate msgs into some java pojo or scala case class. Will The size of that window be tied to the number of events entering that window in a single hour, or will it just be tied to the pojo/case class, as im accumalting the events into that object. (e.g if counting 10000 msgs into an integer, will the size be close to 10000 * msg size or size of an int?) Also, if im using pojos or case classes, does flink handle the state for me (spills to disk if memory exhausted/saves state at check points etc) or must i use flink's state objects for that?

Thanks for your help!

like image 426
yaarix Avatar asked Mar 19 '19 18:03

yaarix


People also ask

How does Flink store state?

State in FlinkState snapshots, i.e., checkpoints and savepoints, are stored in a remote durable storage, and are used to restore the local state in the case of job failures. The appropriate state backend for a production deployment depends on scalability, throughput, and latency requirements.

What is Flink windowing?

Windowing is an approach to break the data stream into mini-batches or finite streams to apply different transformations on it. Flink window opens when the first data element arrives and closes when it meets our criteria to close a window. It can be based on time, count of messages or a more complex condition.

What is a state in Flink?

There are two basic types of states in Flink: keyed state and operator state. The difference between them is that a keyed state is always bound to keys and can only be used on keyed streams. In operator state, the state is bound to an operator on one parallel substream.

What is sink in Flink?

This connector provides a Sink that writes partitioned files to filesystems supported by the Flink FileSystem abstraction. The streaming file sink writes incoming data into buckets. Given that the incoming streams can be unbounded, data in each bucket are organized into part files of finite size.

How to determine the managed status of a Flink application?

Managed status is determined by Flink Status of framework management , And the original state , The user manages the state specific data structure , What is the framework doing checkpoint When , use byte [] To read and write status content , Know nothing about its internal data structure .

What is the data model of Flink?

The data model of Flink is not based on key-value pairs. Therefore, you do not need to physically pack the data set types into keys and values. Keys are “virtual”: they are defined as functions over the actual data to guide the grouping operator. The following example shows a key selector function that simply returns the field of an object:

How to get late data from a global window in Flink?

When using the GlobalWindowswindow assigner no data is ever considered late because the end timestamp of the global window is Long.MAX_VALUE. Getting late data as a side output # Using Flink’s side outputfeature you can get a stream of the data that was discarded as late.

What version of Apache Flink should I use?

Consecutive windowed operations Useful state size considerations This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version. Windows # Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations.


1 Answers

The state size of a window depends on the type of function that you apply. If you apply a ReduceFunction or AggregateFunction, arriving data is immediately aggregated and the window only holds the aggregated value. If you apply a ProcessWindowFunction or WindowFunction, Flink collects all input records and applies the function when time (event or processing time depending on the window type) passes the window's end time.

You can also combine both types of functions, i.e., have an AggregateFunction followed by a ProcessWindowFunction. In that case, arriving records are immediately aggregated and when the window is closed, the aggregation result is passed as single value to the ProcessWindowFunction. This is useful because you have incremental aggregation (due to ReduceFunction / AggregateFunction) but also access to the window metadata like begin and end timestamp (due to ProcessWindowFunction).

How the state is managed depends on the chosen state backend. If you configure the FsStateBackend all local state is kept on the heap of the TaskManager and the JVM process is killed with an OutOfMemoryError if the state grows too large. If you configure the RocksDBStateBackend state is spilled to disk. This comes with de/serialization costs for every state access but gives much more storage for state.

like image 175
Fabian Hueske Avatar answered Sep 21 '22 16:09

Fabian Hueske