When I run code such as the following:
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
newRDD.checkpoint
print(newRDD.count())
and watch the stages in Yarn, I notice that Spark is doing the DAG calculation TWICE -- once for the distinct+count that materializes the RDD and caches it, and then a completely SECOND time to created the checkpointed copy.
Since the RDD is already materialized and cached, why doesn't the checkpointing simply take advantage of this, and save the cached partitions to disk?
Is there an existing way (some kind of configuration setting or code change) to force Spark to take advantage of this and only run the operation ONCE, and checkpointing will just copy things?
Do I need to "materialize" twice, instead?
val newRDD = prevRDD.map(a => (a._1, 1L)).distinct.persist(StorageLevel.MEMORY_AND_DISK_SER)
print(newRDD.count())
newRDD.checkpoint
print(newRDD.count())
I've created an Apache Spark Jira ticket to make this a feature request: https://issues.apache.org/jira/browse/SPARK-8666
Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures. A second abstraction in Spark is shared variables that can be used in parallel operations.
Spark Cache vs Persist Both caching and persisting are used to save the Spark RDD, Dataframe, and Dataset's. But, the difference is, RDD cache() method default saves it to memory (MEMORY_ONLY) whereas persist() method is used to store it to the user-defined storage level.
Because, when we persist RDD each node stores any partition of it that it computes in memory and makes it reusable for future use. This process speeds up the further computation ten times. When the RDD is computed for the first time, it is kept in memory on the node.
Looks like this may be a known issue. See an older JIRA ticket, https://issues.apache.org/jira/browse/SPARK-8582
This is an old question. But it affected me as well so I did some digging. I found a bunch of very unhelpful search results within the change-tracking history for jira and github. These search results contained a lot of tech-babble from the developers about their proposed programming changes. That didn't end up being very informative for me, and I would suggest limiting the amount of time you spend looking at it.
The clearest information I could find on the matter is here: https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
An RDD which needs to be checkpointed will be computed twice; thus it is suggested to do a rdd.cache() before rdd.checkpoint()
Given that the OP actually did use persist and checkpoint, he was probably on the right track. I suspect the only problem was in the way he invoked checkpoint. I'm fairly new to spark but I think he should have done it like so:
newRDD = newRDD.checkpoint
Hope this is clear. Based on my testing, this eliminates the redundant recomputation of one of my dataframes.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With