Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark dataframe checkpoint cleanup

I have a dataframe in spark where an entire partition from Hive has been loaded and i need to break the lineage to overwrite the same partition after some modifications to the data. However, when the spark job is done i am left with the data from the checkpoint on the HDFS. Why do Spark not clean this up by itself or is there something i am missing?

spark.sparkContext.setCheckpointDir("/home/user/checkpoint/")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

val df = spark.table("db.my_table").filter(col("partition").equal(2))

// ... transformations to the dataframe

val checkpointDf = df.checkpoint()
checkpointDf.write.format("parquet").mode(SaveMode.Overwrite).insertInto("db.my_table")

After this i have this file on HDFS:

/home/user/checkpoint/214797f2-ce2e-4962-973d-8f215e5d5dd8/rdd-23/part-00000

And for each time i run the spark job i just get a new directory with a new unique id containing files for each RDD that has been in the dataframes.

like image 891
aweis Avatar asked Jan 31 '20 19:01

aweis


People also ask

How do you cache a DataFrame in Pyspark?

Using cache() and persist() methods, Spark provides an optimization mechanism to store the intermediate computation of a Spark DataFrame so they can be reused in subsequent actions. When you persist a dataset, each node stores its partitioned data in memory and reuses them in other actions on that dataset.

How do I delete a checkpoint folder?

If you want to remove the checkpoint directory from HDFS you can remove it with Python, in the end of your script you could use this command rmtree. This property spark. cleaner. referenceTracking.

What is the advantage of checkpoint in Spark?

If any data is lost, the recovery should be speedy. Spark streaming accomplishes this using checkpointing. So, Checkpointing is a process to truncate RDD lineage graph. It saves the application state timely to reliable storage (HDFS).

What is checkpointing in Spark streaming?

What is Spark Streaming Checkpoint. A process of writing received records at checkpoint intervals to HDFS is checkpointing. It is a requirement that streaming application must operate 24/7. Hence, must be resilient to failures unrelated to the application logic such as system failures, JVM crashes, etc.


1 Answers

Spark has implicit mechanism for checkpoint files cleaning.

Add this property in spark-defaults.conf.

spark.cleaner.referenceTracking.cleanCheckpoints  true #Default is false

You can find more about Spark configuration in Spark official configuration page

If you want to remove the checkpoint directory from HDFS you can remove it with Python, in the end of your script you could use this command rmtree.

This property spark.cleaner.referenceTracking.cleanCheckpoints as true, allows to cleaner to remove old checkpoint files inside the checkpoint directory.

like image 152
ggeop Avatar answered Oct 29 '22 05:10

ggeop