Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink state backend config with the state processor api

Tags:

I use the state-processor-api since it was released to bootstrap my flink states. I use a RocksDBStateBackend and it works. We went to flink 1.13 recently, and the RocksDBStateBackend was deprecated in favor of EmbeddedRocksDBStateBackend.

My issue:

Since the change of API and the new bootstrap job I developed, I got the following exception:

Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=85356498 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.

Here I declare my statebackend:

val backend = new EmbeddedRocksDBStateBackend(true)

And here I create my savepoint:

  Savepoint
    .create(backend, MAX_PARALLELISM)
    .withOperator("my_operator", transformMyOperator)
    .write(savepointPath)

Also, my flink cluster is configured to use a RocksDB state backend, and all others flink topologies use the RocksDB backend.

So I wonder why I got an exception saying I should not use a memory state backend since I use RocksDB. Any help would be welcome.

like image 600
Fray Avatar asked Jul 09 '21 09:07

Fray


2 Answers

It is caused by a bug in 1.13, please see FLINK-23728, running 1.14.0-RC0 did solve the issue for me.

like image 66
nomoa Avatar answered Sep 30 '22 18:09

nomoa


In Flink 1.13 the selection of a state backend was decoupled from the selection of a checkpoint storage provider.

I'm guessing that previously you were relying on the RocksDBStateBackend constructor to specify where you wanted your checkpoints to be stored. Now you should configure this either in flink-conf.yaml

state.checkpoints.dir: file:///checkpoint-dir/

or in your code

env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");

See Migrating from Legacy Backends for more details.

like image 35
David Anderson Avatar answered Sep 30 '22 17:09

David Anderson