Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RDD.checkpoint() not storing any data in checkpoint directory

Tags:

I've set the checkpoint directory with the sc.setCheckpointDir method.

/checkpointDirectory/

I've then created a checkpoint of an rdd: rdd.checkpoint() and in the directory, I now see a new directory representing the new checkpoint, in the form of a random string of letters. Inside that directory there is nothing.

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/  [empty]

Then after doing a couple transformations, I run rdd.checkpoint() again, and there is still nothing in that recently created directory

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/  [empty]

Am I using checkpoint() wrong? What am I supposed to see in that directory to know its working properly?

like image 395
Kristian Avatar asked Jun 12 '16 17:06

Kristian


People also ask

When you mark checkpoint to a RDD it is saved inside the?

You mark an RDD for checkpointing by calling RDD. checkpoint() . The RDD will be saved to a file inside the checkpoint directory and all references to its parent RDDs will be removed.

How do I set a checkpoint folder in spark?

To set the checkpoint directory call: SparkContext. setCheckpointDir(directory: String). When running on the cluster the directory must be an HDFS path since the driver tries to recover the checkpointed RDD from a local file. While the checkpoint files are actually on the executor's machines.

What is the use of checkpointing in spark?

Checkpointing can be used to truncate the logical plan of this DataFrame , which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint directory set with SparkContext. setCheckpointDir() . New in version 2.1.

What is difference between RDD DataFrame and dataset?

While RDD offers low-level control over data, Dataset and DataFrame APIs bring structure and high-level abstractions. Keep in mind that transformations from an RDD to a Dataset or DataFrame are easy to execute.


1 Answers

checkpoint, as many other operations in Spark, is a lazy. Data is actually checkpointed if and only if a given RDD is materialized. Empty directory you see is application specific checkpoint directory.

If you want checkpoint to take place you have to trigger an action which will evaluate corresponding RDD. By example (local mode):

import glob
import os
from urllib.parse import urlparse

sc.setCheckpointDir("/tmp/checkpoints/")
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*")

rdd = sc.range(1000, 10)
plus_one = rdd.map(lambda x: x + 1)
plus_one.cache()
plus_one.checkpoint()  # No checkpoint dir here yet

[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
plus_one.isCheckpointed()
## False

# After count is executed you'll see rdd specific checkpoint dir
plus_one.count()  
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## ['rdd-1']
plus_one.isCheckpointed()
## True

You can also analyze debug strings before:

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
##  |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 [Memory Serialized 1x Replicated]

and after an action:

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
##  |       CachedPartitions: 8; MemorySize: 168.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
##  |  ReliableCheckpointRDD[3] at count at <ipython-input-16-96e746c56973>:1 [Memory Serialized 1x Replicated]

As you can see before RDD will be computed from scratch but after count you'll get ReliableCheckpointRDD.

like image 83
zero323 Avatar answered Sep 28 '22 02:09

zero323