I've set the checkpoint directory with the sc.setCheckpointDir
method.
/checkpointDirectory/
I've then created a checkpoint of an rdd: rdd.checkpoint()
and in the directory, I now see a new directory representing the new checkpoint, in the form of a random string of letters. Inside that directory there is nothing.
/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty]
Then after doing a couple transformations, I run rdd.checkpoint()
again, and there is still nothing in that recently created directory
/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty]
Am I using checkpoint()
wrong? What am I supposed to see in that directory to know its working properly?
You mark an RDD for checkpointing by calling RDD. checkpoint() . The RDD will be saved to a file inside the checkpoint directory and all references to its parent RDDs will be removed.
To set the checkpoint directory call: SparkContext. setCheckpointDir(directory: String). When running on the cluster the directory must be an HDFS path since the driver tries to recover the checkpointed RDD from a local file. While the checkpoint files are actually on the executor's machines.
Checkpointing can be used to truncate the logical plan of this DataFrame , which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext. setCheckpointDir() . New in version 2.1.
While RDD offers low-level control over data, Dataset and DataFrame APIs bring structure and high-level abstractions. Keep in mind that transformations from an RDD to a Dataset or DataFrame are easy to execute.
checkpoint
, as many other operations in Spark, is a lazy. Data is actually checkpointed if and only if a given RDD is materialized. Empty directory you see is application specific checkpoint directory.
If you want checkpoint to take place you have to trigger an action which will evaluate corresponding RDD. By example (local mode):
import glob
import os
from urllib.parse import urlparse
sc.setCheckpointDir("/tmp/checkpoints/")
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*")
rdd = sc.range(1000, 10)
plus_one = rdd.map(lambda x: x + 1)
plus_one.cache()
plus_one.checkpoint() # No checkpoint dir here yet
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
plus_one.isCheckpointed()
## False
# After count is executed you'll see rdd specific checkpoint dir
plus_one.count()
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## ['rdd-1']
plus_one.isCheckpointed()
## True
You can also analyze debug strings before:
## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
## | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 [Memory Serialized 1x Replicated]
and after an action:
## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
## | CachedPartitions: 8; MemorySize: 168.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
## | ReliableCheckpointRDD[3] at count at <ipython-input-16-96e746c56973>:1 [Memory Serialized 1x Replicated]
As you can see before RDD will be computed from scratch but after count
you'll get ReliableCheckpointRDD
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With