What is the difference between spark checkpoint and persist to a disk. Are both these store in the local disk?
Spark RDD persistence is an optimization technique which saves the result of RDD evaluation in cache memory. Using this we save the intermediate result so that we can use it further if required. It reduces the computation overhead.
So, Checkpointing is a process to truncate RDD lineage graph. It saves the application state timely to reliable storage (HDFS). As the driver restarts the recovery takes place. There are two types of data that we checkpoint in Spark: Metadata Checkpointing – Metadata means the data about data.
The only difference between cache() and persist() is ,using Cache technique we can save intermediate results in memory only when needed while in Persist() we can save the intermediate results in 5 storage levels(MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY).
Unpersist syntax and Example Spark automatically monitors every persist() and cache() calls you make and it checks usage on each node and drops persisted data if not used or by using the least-recently-used (LRU) algorithm. You can also manually remove using unpersist() method.
There are few important differences but the fundamental one is what happens with lineage. Persist
/ cache
keeps lineage intact while checkpoint
breaks lineage. Lets consider following examples:
import org.apache.spark.storage.StorageLevel val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
cache
/ persist
:
val indCache = rdd.mapValues(_ > 4) indCache.persist(StorageLevel.DISK_ONLY) indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated] indCache.count // 3 indCache.toDebugString // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated] // | CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B // | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
checkpoint
:
val indChk = rdd.mapValues(_ > 4) indChk.checkpoint indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] // | ShuffledRDD[3] at reduceByKey at <console>:21 [] // +-(8) MapPartitionsRDD[2] at map at <console>:21 [] // | ParallelCollectionRDD[1] at parallelize at <console>:21 [] indChk.count // 3 indChk.toDebugString // (8) MapPartitionsRDD[11] at mapValues at <console>:24 [] // | ReliableCheckpointRDD[12] at count at <console>:27 []
As you can see, in the first case lineage is preserved even if data is fetched from the cache. It means that data can be recomputed from scratch if some partitions of indCache
are lost. In the second case lineage is completely lost after the checkpoint and indChk
doesn't carry an information required to rebuild it anymore.
checkpoint
, unlike cache
/ persist
is computed separately from other jobs. That's why RDD marked for checkpointing should be cached:
It is strongly recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation.
Finally checkpointed
data is persistent and not removed after SparkContext
is destroyed.
Regarding data storage SparkContext.setCheckpointDir
used by RDD.checkpoint
requires DFS
path if running in non-local mode. Otherwise it can be local files system as well. localCheckpoint
and persist
without replication should use local file system.
Important Note:
RDD checkpointing is a different concept than a chekpointing in Spark Streaming. The former one is designed to address lineage issue, the latter one is all about streaming reliability and failure recovery.
I think you can find a very detailed answer here
While it is very hard to summarize all in that page, I will say
Persist
Checkpointing
You may want to read the article for more of the details or internals of Spark's checkpointing or Cache operations.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With