Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: efficiency of dataframe checkpoint vs. explicitly writing to disk

Checkpoint version:

val savePath = "/some/path"
spark.sparkContext.setCheckpointDir(savePath)
df.checkpoint()

Write to disk version:

df.write.parquet(savePath)
val df = spark.read.parquet(savePath)

I think both break the lineage in the same way.

In my experiments checkpoint is almost 30 bigger on disk than parquet (689GB vs. 24GB). In terms of running time, checkpoint takes 1.5 times longer (10.5 min vs 7.5 min).

Considering all this, what would be the point of using checkpoint instead of saving to file? Am I missing something?

like image 821
germanium Avatar asked Aug 09 '18 17:08

germanium


People also ask

What are spark checkpoints on data frames?

What Are Spark Checkpoints on Data Frames? Checkpoints freeze the content of your data frames before you do something else. They're essential to keeping track of your data frames. Join the DZone community and get the full member experience.

How to optimize spark dataframe performance?

Hence, we may need to look at the stages and use optimization techniques as one of the ways to improve performance. Using cache () and persist () methods, Spark provides an optimization mechanism to store the intermediate computation of a Spark DataFrame so they can be reused in subsequent actions.

What happens to the checkpoint file after the spark application terminates?

The checkpoint file won’t be deleted even after the Spark application terminated. Checkpoint files can be used in subsequent job run or driver program Checkpointing an RDD causes double computation because the operation will first call a cache before doing the actual job of computing and writing to the checkpoint directory.

Which is better pandas or spark dataframe?

Similar to those found in Pandas, the Spark DataFrame has intuitive APIs, making it easy to implement. When comparing computation speed between the Pandas DataFrame and the Spark DataFrame, it’s evident that the Pandas DataFrame performs marginally better for relatively small data.


1 Answers

Checkpointing is a process of truncating RDD lineage graph and saving it to a reliable distributed (HDFS) or local file system. If you have a large RDD lineage graph and you want freeze the content of the current RDD i.e. materialize the complete RDD before proceeding to the next step, you generally use persist or checkpoint. The checkpointed RDD then could be used for some other purpose.

When you checkpoint the RDD is serialized and stored in Disk. It doesn't store in parquet format so the data is not properly storage optimized in the Disk. Contraty to parquet which provides various compaction and encoding to store optimize the data. This would explain the difference in the Size.

  • You should definitely think about checkpointing in a noisy cluster. A cluster is called noisy if there are lots of jobs and users which compete for resources and there are not enough resources to run all the jobs simultaneously.

  • You must think about checkpointing if your computations are really expensive and take long time to finish because it could be faster to write an RDD to HDFS and read it back in parallel than recompute from scratch.

And there's a slight inconvenience prior to spark2.1 release; there is no way to checkpoint a dataframe so you have to checkpoint the underlying RDD. This issue has been resolved in spark2.1 and above versions.

The problem with saving to Disk in parquet and read it back is that

  • It could be inconvenient in coding. You need to save and read multiple times.
  • It could be a slower process in the overall performance of the job. Because when you save as parquet and read it back the Dataframe needs to be reconstructed again.

This wiki could be useful for further investigation

As presented in the dataset checkpointing wiki

Checkpointing is actually a feature of Spark Core (that Spark SQL uses for distributed computations) that allows a driver to be restarted on failure with previously computed state of a distributed computation described as an RDD. That has been successfully used in Spark Streaming - the now-obsolete Spark module for stream processing based on RDD API.

Checkpointing truncates the lineage of a RDD to be checkpointed. That has been successfully used in Spark MLlib in iterative machine learning algorithms like ALS.

Dataset checkpointing in Spark SQL uses checkpointing to truncate the lineage of the underlying RDD of a Dataset being checkpointed.

like image 136
Avishek Bhattacharya Avatar answered Oct 18 '22 05:10

Avishek Bhattacharya