I use the state-processor-api since it was released to bootstrap my flink states. I use a RocksDBStateBackend and it works. We went to flink 1.13 recently, and the RocksDBStateBackend was deprecated in favor of EmbeddedRocksDBStateBackend.
My issue:
Since the change of API and the new bootstrap job I developed, I got the following exception:
Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=85356498 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
Here I declare my statebackend:
val backend = new EmbeddedRocksDBStateBackend(true)
And here I create my savepoint:
Savepoint
.create(backend, MAX_PARALLELISM)
.withOperator("my_operator", transformMyOperator)
.write(savepointPath)
Also, my flink cluster is configured to use a RocksDB state backend, and all others flink topologies use the RocksDB backend.
So I wonder why I got an exception saying I should not use a memory state backend since I use RocksDB. Any help would be welcome.
It is caused by a bug in 1.13, please see FLINK-23728, running 1.14.0-RC0 did solve the issue for me.
In Flink 1.13 the selection of a state backend was decoupled from the selection of a checkpoint storage provider.
I'm guessing that previously you were relying on the RocksDBStateBackend constructor to specify where you wanted your checkpoints to be stored. Now you should configure this either in flink-conf.yaml
state.checkpoints.dir: file:///checkpoint-dir/
or in your code
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
See Migrating from Legacy Backends for more details.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With