I have two questions on Flink externalized checkpoints
(Q1) I can set "state.checkpoints.dir" in flink-conf.yaml to get externalized checkpoints to work all right, but how do I achieve same thing when I run flink from IDE? I tried the GlobalConfiguration approach mentioned in (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/state-checkpoints-dir-td17921.html) but no luck. This is how I did it:
Configuration cfg =
GlobalConfiguration.loadConfiguration();
cfg.setString("state.checkpoints.dir", "file:///tmp/checkpoints/state");
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
and this is the error msg show in IDE:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Failed to submit job ef7050e2308a4787d983d80f3c07f55c (Long Taxi Rides (checkpointed))
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1325)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:211)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:478)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:291)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
... 19 more
Process finished with exit code 1
(Q2) In the checkpoint's document (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/checkpointing.html), it says "This way, you will have a checkpoint around to resume from if your job fails.", how about the cancelled jobs? will the new job carry on with the existing checkpoint or it will start with its own checkpoint?
You can control whether externalized checkpoints are deleted when the job is cancelled. If you want to retain them, you can do this:
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
For more info, see the docs.
To resume from an externalized checkpoint one does this (the same as resuming from a savepoint):
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
First question,create a local environment with custom config:
val conf = new Configuration()
conf.setString(CoreOptions.CHECKPOINTS_DIRECTORY, "file:///user/flink/checkpoint/storing/")
StreamExecutionEnvironment.createLocalEnvironment(4, conf)
Second question, as David said:
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With