Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does cache() in spark change the state of the RDD or create a new one?

This question is a follow up to a previous question I had What happens if I cache the same RDD twice in Spark.

When calling cache() on a RDD, does the state of the RDD changed (and the returned RDD is just this for ease of use) or a new RDD is created the wrapped the existing one?

What will happen in the following code:

// Init
JavaRDD<String> a = ... // some initialise and calculation functions.
JavaRDD<String> b = a.cache();
JavaRDD<String> c = b.cache();

// Case 1, will 'a' be calculated twice in this case 
// because it's before the cache layer:
a.saveAsTextFile(somePath);
a.saveAsTextFile(somePath);

// Case 2, will the data of the calculation of 'a' 
// be cached in the memory twice in this case
// (once as 'b' and once as 'c'):
c.saveAsTextFile(somePath);
like image 565
Roee Gavirel Avatar asked Mar 13 '23 19:03

Roee Gavirel


1 Answers

When calling cache() on a RDD, does the state of the RDD changed (and the returned RDD is just this for ease of use) or a new RDD is created the wrapped the existing one

The same RDD is returned:

/**
 * Mark this RDD for persisting using the specified level.
 *
 * @param newLevel the target storage level
 * @param allowOverride whether to override any existing level with the new one
 */
  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
  // TODO: Handle changes of StorageLevel
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
}
  // If this is the first time this RDD is marked for persisting, register it
  // with the SparkContext for cleanups and accounting. Do this only once.
  if (storageLevel == StorageLevel.NONE) {
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    sc.persistRDD(this)
  }
  storageLevel = newLevel
  this
}

Caching doesn't cause any side effect to the said RDD. If it's already marked for persistence, nothing will happen. If it isn't, the only side effect would be registering it to the SparkContext, where the side effect isn't on the RDD itself, but the context.

Edit:

Looking at JavaRDD.cache, it seems that the underlying call will cause the allocation of another JavaRDD:

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())

Where wrapRDD calls JavaRDD.fromRDD:

object JavaRDD {

  implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
  implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}

Which will cause the allocation of a new JavaRDD. That said, the internal instance of RDD[T] will remain the same.

like image 124
Yuval Itzchakov Avatar answered Mar 15 '23 13:03

Yuval Itzchakov