I have a dataframe in spark where an entire partition from Hive has been loaded and i need to break the lineage to overwrite the same partition after some modifications to the data. However, when the spark job is done i am left with the data from the checkpoint on the HDFS. Why do Spark not clean this up by itself or is there something i am missing?
spark.sparkContext.setCheckpointDir("/home/user/checkpoint/")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
val df = spark.table("db.my_table").filter(col("partition").equal(2))
// ... transformations to the dataframe
val checkpointDf = df.checkpoint()
checkpointDf.write.format("parquet").mode(SaveMode.Overwrite).insertInto("db.my_table")
After this i have this file on HDFS:
/home/user/checkpoint/214797f2-ce2e-4962-973d-8f215e5d5dd8/rdd-23/part-00000
And for each time i run the spark job i just get a new directory with a new unique id containing files for each RDD that has been in the dataframes.
Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of a Spark DataFrame so they can be reused in subsequent actions. When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset.
If you want to remove the checkpoint directory from HDFS you can remove it with Python, in the end of your script you could use this command rmtree. This property spark. cleaner. referenceTracking.
If any data is lost, the recovery should be speedy. Spark streaming accomplishes this using checkpointing. So, Checkpointing is a process to truncate RDD lineage graph. It saves the application state timely to reliable storage (HDFS).
What is Spark Streaming Checkpoint. A process of writing received records at checkpoint intervals to HDFS is checkpointing. It is a requirement that streaming application must operate 24/7. Hence, must be resilient to failures unrelated to the application logic such as system failures, JVM crashes, etc.
Spark has implicit mechanism for checkpoint files cleaning.
Add this property in spark-defaults.conf.
spark.cleaner.referenceTracking.cleanCheckpoints true #Default is false
You can find more about Spark configuration in Spark official configuration page
If you want to remove the checkpoint directory from HDFS you can remove it with Python, in the end of your script you could use this command rmtree.
This property spark.cleaner.referenceTracking.cleanCheckpoints
as true
, allows to cleaner to remove old checkpoint files inside the checkpoint directory.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With