Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark RDD checkpoint on persisted/cached RDDs are performing the DAG twice

When I run code such as the following:

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())

and watch the stages in Yarn, I notice that Spark is doing the DAG calculation TWICE -- once for the distinct+count that materializes the RDD and caches it, and then a completely SECOND time to created the checkpointed copy.

Since the RDD is already materialized and cached, why doesn't the checkpointing simply take advantage of this, and save the cached partitions to disk?

Is there an existing way (some kind of configuration setting or code change) to force Spark to take advantage of this and only run the operation ONCE, and checkpointing will just copy things?

Do I need to "materialize" twice, instead?

val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())

newRDD.checkpoint
print(newRDD.count())

I've created an Apache Spark Jira ticket to make this a feature request: https://issues.apache.org/jira/browse/SPARK-8666

like image 772
Glenn Strycker Avatar asked Jun 26 '15 16:06

Glenn Strycker


People also ask

Does Spark reuse RDDs?

Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures. A second abstraction in Spark is shared variables that can be used in parallel operations.

What is the difference between persist and cache in Spark?

Spark Cache vs Persist Both caching and persisting are used to save the Spark RDD, Dataframe, and Dataset's. But, the difference is, RDD cache() method default saves it to memory (MEMORY_ONLY) whereas persist() method is used to store it to the user-defined storage level.

Why do we use persist () on links RDD?

Because, when we persist RDD each node stores any partition of it that it computes in memory and makes it reusable for future use. This process speeds up the further computation ten times. When the RDD is computed for the first time, it is kept in memory on the node.


2 Answers

Looks like this may be a known issue. See an older JIRA ticket, https://issues.apache.org/jira/browse/SPARK-8582

like image 78
Glenn Strycker Avatar answered Oct 20 '22 02:10

Glenn Strycker


This is an old question. But it affected me as well so I did some digging. I found a bunch of very unhelpful search results within the change-tracking history for jira and github. These search results contained a lot of tech-babble from the developers about their proposed programming changes. That didn't end up being very informative for me, and I would suggest limiting the amount of time you spend looking at it.

The clearest information I could find on the matter is here: https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

An RDD which needs to be checkpointed will be computed twice; thus it is suggested to do a rdd.cache() before rdd.checkpoint()

Given that the OP actually did use persist and checkpoint, he was probably on the right track. I suspect the only problem was in the way he invoked checkpoint. I'm fairly new to spark but I think he should have done it like so:

newRDD = newRDD.checkpoint

Hope this is clear. Based on my testing, this eliminates the redundant recomputation of one of my dataframes.

like image 23
David Beavon Avatar answered Oct 20 '22 01:10

David Beavon