Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the difference between spark checkpoint and persist to a disk

Tags:

apache-spark

What is the difference between spark checkpoint and persist to a disk. Are both these store in the local disk?

like image 463
nagendra Avatar asked Feb 01 '16 10:02

nagendra


People also ask

What does persist () do in Spark?

Spark RDD persistence is an optimization technique which saves the result of RDD evaluation in cache memory. Using this we save the intermediate result so that we can use it further if required. It reduces the computation overhead.

What is a checkpoint in Spark?

So, Checkpointing is a process to truncate RDD lineage graph. It saves the application state timely to reliable storage (HDFS). As the driver restarts the recovery takes place. There are two types of data that we checkpoint in Spark: Metadata Checkpointing – Metadata means the data about data.

What is the difference between cache () and persist ()?

The only difference between cache() and persist() is ,using Cache technique we can save intermediate results in memory only when needed while in Persist() we can save the intermediate results in 5 storage levels(MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY).

What is persist and Unpersist in Spark?

Unpersist syntax and Example Spark automatically monitors every persist() and cache() calls you make and it checks usage on each node and drops persisted data if not used or by using the least-recently-used (LRU) algorithm. You can also manually remove using unpersist() method.


2 Answers

There are few important differences but the fundamental one is what happens with lineage. Persist / cache keeps lineage intact while checkpoint breaks lineage. Lets consider following examples:

import org.apache.spark.storage.StorageLevel  val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _) 
  • cache / persist:

    val indCache  = rdd.mapValues(_ > 4) indCache.persist(StorageLevel.DISK_ONLY)  indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]  indCache.count // 3  indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] //  |       CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated] 
  • checkpoint:

    val indChk  = rdd.mapValues(_ > 4) indChk.checkpoint  indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [] //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [] //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 []  indChk.count // 3  indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] //  |  ReliableCheckpointRDD[12] at count at <console>:27 [] 

As you can see, in the first case lineage is preserved even if data is fetched from the cache. It means that data can be recomputed from scratch if some partitions of indCache are lost. In the second case lineage is completely lost after the checkpoint and indChk doesn't carry an information required to rebuild it anymore.

checkpoint, unlike cache / persist is computed separately from other jobs. That's why RDD marked for checkpointing should be cached:

It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.

Finally checkpointed data is persistent and not removed after SparkContext is destroyed.

Regarding data storage SparkContext.setCheckpointDir used by RDD.checkpoint requires DFS path if running in non-local mode. Otherwise it can be local files system as well. localCheckpoint and persist without replication should use local file system.

Important Note:

RDD checkpointing is a different concept than a chekpointing in Spark Streaming. The former one is designed to address lineage issue, the latter one is all about streaming reliability and failure recovery.

like image 180
zero323 Avatar answered Oct 03 '22 07:10

zero323


I think you can find a very detailed answer here

While it is very hard to summarize all in that page, I will say

Persist

  • Persisting or caching with StorageLevel.DISK_ONLY cause the generation of RDD to be computed and stored in a location such that subsequent use of that RDD will not go beyond that points in recomputing the linage.
  • After persist is called, Spark still remembers the lineage of the RDD even though it doesn't call it.
  • Secondly, after the application terminates, the cache is cleared or file destroyed

Checkpointing

  • Checkpointing stores the rdd physically to hdfs and destroys the lineage that created it.
  • The checkpoint file won't be deleted even after the Spark application terminated.
  • Checkpoint files can be used in subsequent job run or driver program
  • Checkpointing an RDD causes double computation because the operation will first call a cache before doing the actual job of computing and writing to the checkpoint directory.

You may want to read the article for more of the details or internals of Spark's checkpointing or Cache operations.

like image 29
okmich Avatar answered Oct 03 '22 08:10

okmich