Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Flink: My application does not resume from a checkpoint when I restart it

I have a Flink job in which I am reading files from a folder and dumping it in database. New files will come in that folder daily.

I have enabled checkpointing so that if for any reason Flink job stops and I need to restart, Flink job should not read the files that already been read.

I have added below lines in my code but when I restart my job, Flink job again reads all the files.

env.setStateBackend(new FsStateBackend("file:///C://Users//folder"));
env.enableCheckpointing(10L);
like image 423
Ankit Avatar asked Jan 23 '19 10:01

Ankit


People also ask

How do I enable checkpoints in Flink?

Enabling and Configuring Checkpointing. By default, checkpointing is disabled. To enable checkpointing, call enableCheckpointing(n) on the StreamExecutionEnvironment , where n is the checkpoint interval in milliseconds.

How do Flink checkpoints work?

Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered, thereby giving the application the same semantics as a failure-free execution. See Checkpointing for how to enable and configure checkpoints for your program.

What is checkpoint and savepoint in Flink?

Conceptually, Flink's savepoints are different from checkpoints in a way that's analogous to how backups are different from recovery logs in traditional database systems. The primary purpose of checkpoints is to provide a recovery mechanism in case of unexpected job failures.

How does Flink guarantee exactly once processing?

Apache Flink guarantee exactly once processing upon failure and recovery by resuming the job from a checkpoint, with the checkpoint being a consistent snapshot of the distributed data stream and operator state (Chandy-Lamport algorithm for distributed snapshots). This guarantee exactly once upon failover.


2 Answers

Checkpoints are a mechanism to recover from failures during the execution of an application, not to resume an application that was explicitly canceled.

If you have a running application and the execution fails (for whatever reason), Flink will try to recover the application by restarting it and initializing the state of the operators from the last checkpoint. If the recovery fails (for example because not enough processing slots are available), the job is considered as failed.

If you manually cancel an application and restart it, Flink will not a checkpoint to initialize the state of the operators. In fact, Flink will (by default) delete all checkpoints when you cancel an application.

The concept you are looking for are savepoints. Savepoints are very similar to checkpoints but manually triggered by the user and not automatically deleted when the application is explicitly canceled. When starting an application, you can start it from a savepoint which means that the operator state is initialized from the savepoint.

There are also different restart strategies available to configure how often and in which intervals Flink tries to restart a failed application.

like image 62
Fabian Hueske Avatar answered Oct 17 '22 21:10

Fabian Hueske


@fabian-hueske covered all aspects of whats going on with your "planned" restart

You should plan to cancel the job with a savepoint

flink cancel --withSavepoint ${SAVEPOINT_DIR} ${JOBID}

Restart the new Job with the savepoint from prev step..

flink run -s ${SAVE_POINT} -p ${PARALLELISM} -d ${JOB_JAR} ${JOB_ARGS}
like image 4
Bon Speedy Avatar answered Oct 17 '22 20:10

Bon Speedy