How frequently does Flink de/serialise operator state? Per get/update or based on checkpoints? Does the state backend make a difference?
I suspect that in the case of a keyed-stream with a diverse key (millions) and thousands of events per second for each key, the de/serialization might be a big issue. Am I right?
The Flink TaskManager is allocated with 1.5 CPU cores and 4 GB memory. The job uses the RocksDB state backend, which is configured to use Flink's managed memory.
State in FlinkState snapshots, i.e., checkpoints and savepoints, are stored in a remote durable storage, and are used to restore the local state in the case of job failures. The appropriate state backend for a production deployment depends on scalability, throughput, and latency requirements.
A checkpoint in Flink is a global, asynchronous snapshot of application state that's taken on a regular interval and sent to durable storage (usually, a distributed file system). In the event of a failure, Flink restarts an application using the most recently completed checkpoint as a starting point.
The keyed state interfaces provides access to different types of state that are all scoped to the key of the current input element. This means that this type of state can only be used on a KeyedStream , which can be created via stream.
Your assumption is correct. It depends on the state backend.
Backends that store state on the JVM heap (MemoryStateBackend
and FSStateBackend
) do not serialize state for regular read/write accesses but keep it as objects on the heap. While this leads to very fast accesses, you are obviously bound to the size of the JVM heap and also might face garbage collection issues. When a checkpoint is taken, the objects are serialized and persisted to enable recovery in case of a failure.
In contrast, the RocksDBStateBackend
stores all state as byte arrays in embedded RocksDB instances. Therefore, it de/serializes the state of a key for every read/write access. You can control "how much" state is serialized by choosing an appropriate state primitive, i.e., ValueState
, ListState
, MapState
, etc.
For example, ValueState
is always de/serialized as a whole, whereas a MapState.get(key)
only serializes the key (for the lookup) and deserializes the returned value for the key. Hence, you should use MapState<String, String>
instead of ValueState<HashMap<String, String>>
. Similar considerations apply for the other state primitives.
The RocksDBStateBackend
checkpoints its state by copying their files to a persistent filesystem. Hence, there is no additional serialization involved when a checkpoint is taken.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With