This question is a follow up to a previous question I had What happens if I cache the same RDD twice in Spark.
When calling cache()
on a RDD, does the state of the RDD changed (and the returned RDD is just this
for ease of use) or a new RDD is created the wrapped the existing one?
What will happen in the following code:
// Init
JavaRDD<String> a = ... // some initialise and calculation functions.
JavaRDD<String> b = a.cache();
JavaRDD<String> c = b.cache();
// Case 1, will 'a' be calculated twice in this case
// because it's before the cache layer:
a.saveAsTextFile(somePath);
a.saveAsTextFile(somePath);
// Case 2, will the data of the calculation of 'a'
// be cached in the memory twice in this case
// (once as 'b' and once as 'c'):
c.saveAsTextFile(somePath);
When calling cache() on a RDD, does the state of the RDD changed (and the returned RDD is just this for ease of use) or a new RDD is created the wrapped the existing one
The same RDD
is returned:
/**
* Mark this RDD for persisting using the specified level.
*
* @param newLevel the target storage level
* @param allowOverride whether to override any existing level with the new one
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
Caching doesn't cause any side effect to the said RDD. If it's already marked for persistence, nothing will happen. If it isn't, the only side effect would be registering it to the SparkContext
, where the side effect isn't on the RDD
itself, but the context.
Edit:
Looking at JavaRDD.cache
, it seems that the underlying call will cause the allocation of another JavaRDD
:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
Where wrapRDD
calls JavaRDD.fromRDD
:
object JavaRDD {
implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}
Which will cause the allocation of a new JavaRDD
. That said, the internal instance of RDD[T]
will remain the same.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With