Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Removing selective items from ListState

Tags:

apache-flink

I am implementing custom operator which has large state (which may not fit in memory). I am trying to use ListState for this purpose. I am using

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

as mentioned in https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state

Implementation of snapshotState() in the above link clears the checkpointedState and then add elements from in-memory data structure to checkpointedState.

Instead I need something like below in the snapshotState():

  1. Remove specific entries for checkpointedState instead of clear().
  2. Add new elements from in-memory data structure to checkpointedState.

Is there any way to selectively remove items from ListState?

like image 883
Yogi Devendra Avatar asked Mar 07 '23 13:03

Yogi Devendra


1 Answers

No, removing specific elements from a ListState unfortunately not possible. If you want to preserve specific list entries, you have to fetch them into a collection before clearing the ListState and reinsert them afterwards again.

In the example that you reference, all state objects are stored in the bufferedElements variable and only inserted into the ListState when a checkpoint is done. That means, the complete operator state is always stored on the JVM heap in bufferedElements. You can store parts of the operator state also in the ListState (instead of holding it on the heap) but it will be quite expensive to access individual elements because you have to traverse an iterator.

like image 196
Fabian Hueske Avatar answered May 01 '23 11:05

Fabian Hueske