Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding Spark's caching

Tags:

apache-spark

I'm trying to understand how Spark's cache work.

Here is my naive understanding, please let me know if I'm missing something:

val rdd1 = sc.textFile("some data") rdd1.cache() //marks rdd1 as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.saveAsTextFile("...") rdd3.saveAsTextFile("...") 

In the above, rdd1 will be loaded from disk (e.g. HDFS) only once. (when rdd2 is saved I assume) and then from cache (assuming there is enough RAM) when rdd3 is saved)

Now here is my question. Let's say I want to cache rdd2 and rdd3 as they will both be used later on, but I don't need rdd1 after creating them.

Basically there is duplication, isn't it? Since once rdd2 and rdd3 are calculated, I don't need rdd1 anymore, I should probably unpersist it, right? the question is when?

Will this work? (Option A)

val rdd1 = sc.textFile("some data") rdd1.cache()   // marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.cache() rdd3.cache() rdd1.unpersist() 

Does spark add the unpersist call to the DAG? or is it done immediately? if it's done immediately, then basically rdd1 will be non cached when I read from rdd2 and rdd3, right?

Should I do it this way instead (Option B)?

val rdd1 = sc.textFile("some data") rdd1.cache()   // marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...)  rdd2.cache() rdd3.cache()  rdd2.saveAsTextFile("...") rdd3.saveAsTextFile("...")  rdd1.unpersist() 

So the question is this: Is Option A good enough? i.e. will rdd1 still load the file only once? Or do I need to go with Option B?

like image 561
Eran Medan Avatar asked Apr 27 '15 18:04

Eran Medan


People also ask

How does cache work in Pyspark?

Spark will cache whatever it can in memory and spill the rest to disk. Reading data from source(hdfs:// or s3://) is time consuming. So after you read data from the source and apply all the common operations, cache it if you are going to reuse the data.

When should I cache in Spark?

Caching is recommended in the following situations: For RDD re-use in iterative machine learning applications. For RDD re-use in standalone Spark applications. When RDD computation is expensive, caching can help in reducing the cost of recovery in the case one executor fails.

What is RDD caching?

RDD Caching and Persistence Caching or persistence are optimisation techniques for (iterative and interactive) Spark computations. They help saving interim partial results so they can be reused in subsequent stages.


2 Answers

It would seem that Option B is required. The reason is related to how persist/cache and unpersist are executed by Spark. Since RDD transformations merely build DAG descriptions without execution, in Option A by the time you call unpersist, you still only have job descriptions and not a running execution.

This is relevant because a cache or persist call just adds the RDD to a Map of RDDs that marked themselves to be persisted during job execution. However, unpersist directly tells the blockManager to evict the RDD from storage and removes the reference in the Map of persistent RDDs.

persist function

unpersist function

So you would need to call unpersist after Spark actually executed and stored the RDD with the block manager.

The comments for the RDD.persist method hint towards this: rdd.persist

like image 78
Rich Avatar answered Oct 02 '22 18:10

Rich


In option A, you have not shown when you are calling the action (call to save)

val rdd1 = sc.textFile("some data") rdd.cache() //marks rdd as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.cache() rdd3.cache() rdd1.unpersist() rdd2.saveAsTextFile("...") rdd3.saveAsTextFile("...") 

If the sequence is as above, Option A should use cached version of rdd1 for computing both rdd2 and rdd 3

like image 36
ayan guha Avatar answered Oct 02 '22 17:10

ayan guha