According the documentation is possible to tell Spark to keep track of "out of scope" checkpoints - those that are not needed anymore - and clean them from disk.
SparkSession.builder
...
.config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
.getOrCreate()
Apparently it does so but the problem, however, is that the last checkpointed rdds are never deleted.
0c514fb8-498c-4455-b147-aff242bd7381
from SparkContext
the same way you can get the applicationId
If you want to remove the checkpoint directory from HDFS you can remove it with Python, in the end of your script you could use this command rmtree. This property spark. cleaner.
Similarly, you must delete both the checkpoint metadata and content directories to delete a checkpoint. You can delete checkpoints using the Delete option on the administrative console Repository checkpoints page. To access the page, click System administration > Extended repository service > Repository checkpoints.
Types of Checkpointing in Apache SparkThere are 2 types of Apache Spark checkpointing: Reliable Checkpointing :- Checkpointing in which actual RDD is saved to reliable distributed storage i.e. HDFS. We need to call the SparkContext. setCheckpointDir(directory: String) method to set the checkpointing directory.
Checkpointing is actually a feature of Spark Core (that Spark SQL uses for distributed computations) that allows a driver to be restarted on failure with previously computed state of a distributed computation described as an RDD .
I know its old question but recently i was exploring on checkpoint
and had similar problems. Would like to share the findings.
Question :Is there any configuration I am missing to perform all cleanse?
Setting spark.cleaner.referenceTracking.cleanCheckpoints=true
is working sometime but its hard to rely on it. official document says that by setting this property
clean checkpoint files if the reference is out of scope
I don't know what exactly it means because my understanding is once spark session/context is stopped it should clean it.
However, I found a answer to your below question
If there isn't: Is there any way to get the name of the temporary folder created for a particular application so I can programatically delete it? I.e. Get 0c514fb8-498c-4455-b147-aff242bd7381 from SparkContext the same way you can get the applicationId
Yes, We can get the checkpointed
directory like below:
Scala :
//Set directory
scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")
scala> spark.sparkContext.getCheckpointDir.get
res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3
//It gives String so we can use org.apache.hadoop.fs to delete path
PySpark:
// Set directory
>>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
>>> t = sc._jsc.sc().getCheckpointDir().get()
>>> t
u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'
// notice 'u' at the start which means It returns unicode object
// Below are the steps to get hadoop file system object and delete
>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True
>>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With