I have a Flink job in which I am reading files from a folder and dumping it in database. New files will come in that folder daily.
I have enabled checkpointing so that if for any reason Flink job stops and I need to restart, Flink job should not read the files that already been read.
I have added below lines in my code but when I restart my job, Flink job again reads all the files.
env.setStateBackend(new FsStateBackend("file:///C://Users//folder"));
env.enableCheckpointing(10L);
Enabling and Configuring Checkpointing. By default, checkpointing is disabled. To enable checkpointing, call enableCheckpointing(n) on the StreamExecutionEnvironment , where n is the checkpoint interval in milliseconds.
Checkpoints make state in Flink fault tolerant by allowing state and the corresponding stream positions to be recovered, thereby giving the application the same semantics as a failure-free execution. See Checkpointing for how to enable and configure checkpoints for your program.
Conceptually, Flink's savepoints are different from checkpoints in a way that's analogous to how backups are different from recovery logs in traditional database systems. The primary purpose of checkpoints is to provide a recovery mechanism in case of unexpected job failures.
Apache Flink guarantee exactly once processing upon failure and recovery by resuming the job from a checkpoint, with the checkpoint being a consistent snapshot of the distributed data stream and operator state (Chandy-Lamport algorithm for distributed snapshots). This guarantee exactly once upon failover.
Checkpoints are a mechanism to recover from failures during the execution of an application, not to resume an application that was explicitly canceled.
If you have a running application and the execution fails (for whatever reason), Flink will try to recover the application by restarting it and initializing the state of the operators from the last checkpoint. If the recovery fails (for example because not enough processing slots are available), the job is considered as failed.
If you manually cancel an application and restart it, Flink will not a checkpoint to initialize the state of the operators. In fact, Flink will (by default) delete all checkpoints when you cancel an application.
The concept you are looking for are savepoints. Savepoints are very similar to checkpoints but manually triggered by the user and not automatically deleted when the application is explicitly canceled. When starting an application, you can start it from a savepoint which means that the operator state is initialized from the savepoint.
There are also different restart strategies available to configure how often and in which intervals Flink tries to restart a failed application.
@fabian-hueske covered all aspects of whats going on with your "planned" restart
You should plan to cancel the job with a savepoint
flink cancel --withSavepoint ${SAVEPOINT_DIR} ${JOBID}
Restart the new Job with the savepoint from prev step..
flink run -s ${SAVE_POINT} -p ${PARALLELISM} -d ${JOB_JAR} ${JOB_ARGS}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With