Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Flink: How often is state de/serialized?

How frequently does Flink de/serialise operator state? Per get/update or based on checkpoints? Does the state backend make a difference?

I suspect that in the case of a keyed-stream with a diverse key (millions) and thousands of events per second for each key, the de/serialization might be a big issue. Am I right?

like image 230
Reza Same'ei Avatar asked Jul 25 '18 12:07

Reza Same'ei


People also ask

Does Flink use RocksDB?

The Flink TaskManager is allocated with 1.5 CPU cores and 4 GB memory. The job uses the RocksDB state backend, which is configured to use Flink's managed memory.

How does Flink store state?

State in FlinkState snapshots, i.e., checkpoints and savepoints, are stored in a remote durable storage, and are used to restore the local state in the case of job failures. The appropriate state backend for a production deployment depends on scalability, throughput, and latency requirements.

How do Flink checkpoints work?

A checkpoint in Flink is a global, asynchronous snapshot of application state that's taken on a regular interval and sent to durable storage (usually, a distributed file system). In the event of a failure, Flink restarts an application using the most recently completed checkpoint as a starting point.

What is keyed State in Flink?

The keyed state interfaces provides access to different types of state that are all scoped to the key of the current input element. This means that this type of state can only be used on a KeyedStream , which can be created via stream.


1 Answers

Your assumption is correct. It depends on the state backend.

Backends that store state on the JVM heap (MemoryStateBackend and FSStateBackend) do not serialize state for regular read/write accesses but keep it as objects on the heap. While this leads to very fast accesses, you are obviously bound to the size of the JVM heap and also might face garbage collection issues. When a checkpoint is taken, the objects are serialized and persisted to enable recovery in case of a failure.

In contrast, the RocksDBStateBackend stores all state as byte arrays in embedded RocksDB instances. Therefore, it de/serializes the state of a key for every read/write access. You can control "how much" state is serialized by choosing an appropriate state primitive, i.e., ValueState, ListState, MapState, etc.

For example, ValueState is always de/serialized as a whole, whereas a MapState.get(key) only serializes the key (for the lookup) and deserializes the returned value for the key. Hence, you should use MapState<String, String> instead of ValueState<HashMap<String, String>>. Similar considerations apply for the other state primitives.

The RocksDBStateBackend checkpoints its state by copying their files to a persistent filesystem. Hence, there is no additional serialization involved when a checkpoint is taken.

like image 65
Fabian Hueske Avatar answered Sep 27 '22 22:09

Fabian Hueske