I am implementing custom operator which has large state (which may not fit in memory). I am trying to use ListState for this purpose. I am using
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
as mentioned in https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
Implementation of snapshotState() in the above link clears the checkpointedState and then add elements from in-memory data structure to checkpointedState.
Instead I need something like below in the snapshotState():
Is there any way to selectively remove items from ListState?
No, removing specific elements from a ListState
unfortunately not possible.
If you want to preserve specific list entries, you have to fetch them into a collection before clearing the ListState
and reinsert them afterwards again.
In the example that you reference, all state objects are stored in the bufferedElements
variable and only inserted into the ListState
when a checkpoint is done. That means, the complete operator state is always stored on the JVM heap in bufferedElements
. You can store parts of the operator state also in the ListState
(instead of holding it on the heap) but it will be quite expensive to access individual elements because you have to traverse an iterator.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With