Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Where is cached RDD stored (i.e. in a distributed way or on a single node)?

When we cache an RDD in Spark, is it then stored in a distributed way or on a single node? In which systems memory is it stored?

like image 889
Neha Sharma Avatar asked Jun 03 '17 18:06

Neha Sharma


2 Answers

The data remains distributed and is not stored in a single node after a cache operation. The cache or persists option doesnt change/affect the distribution of an RDD. the distribution per and post cache operation remains the same.

like image 51
rogue-one Avatar answered Sep 28 '22 03:09

rogue-one


cache is a lazy operator (it's more a hint than a transformation or an action in RDD sense).

Quoting the scaladoc of cache:

cache(): RDD.this.type Persist this RDD with the default storage level (MEMORY_ONLY).

When a Spark application is submitted for execution (using spark-submit) it requests Spark executors. Quoting the official documentation's Components:

Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.

Every executor starts its own BlockManager (with BlockManagerMaster hosted on the driver). Quoting Mastering Apache Spark gitbook:

BlockManager is a key-value store for blocks of data (simply blocks) in Spark. BlockManager acts as a local cache that runs on every "node" in a Spark application, i.e. the driver and executors.

Once you execute an action, it triggers loading the dataset from external data sources and the data starts flowing through the Spark distributed computation pipeline.

That's when the data gets cached in every BlockManager that participates in the computation. Their number is exactly the number of partitions of the RDD that was cached and can be checked using so-called RDD lineage:

RDD Lineage (aka RDD operator graph or RDD dependency graph) is a graph of all the parent RDDs of a RDD. It is built as a result of applying transformations to the RDD and creates a logical execution plan.

You can see the RDD lineage of a Spark computation using RDD.toDebugString:

toDebugString: String A description of this RDD and its recursive dependencies for debugging.

One could be as follows:

val rdd = sc.parallelize(0 to 9).groupBy(_ % 3).flatMap { case (_, ns) => ns }
scala> rdd.toDebugString
res4: String =
(8) MapPartitionsRDD[7] at flatMap at <console>:24 []
 |  ShuffledRDD[6] at groupBy at <console>:24 []
 +-(8) MapPartitionsRDD[5] at groupBy at <console>:24 []
    |  ParallelCollectionRDD[4] at parallelize at <console>:24 []

(8 partitions are due to local[*] master URL that I used to start spark-shell with * mapped to the number of CPU cores on my machine)

Actually, the number of BlockManagers in use is the number of tasks per stage (in a Spark job) and could be as many as the number of Spark executors. It varies per stage.

Wrapping up...

When we cache an RDD in Spark, is it then stored in a distributed way or on a single node?

Distributed way, but could happen to be on a single node if the number of partitions in a stage is 1.

In which systems memory is it stored?

In "systems memory" of the Spark executor that hosts the BlockManager which happens to be in charge of RDD blocks.


RDD is a description of a distributed computation and is "gone" when DAGScheduler (that runs on the driver) maps it to TaskSets per stage with as many tasks as partitions.

RDD and partitions "disappear" once you execute an action and are transformed into stages and tasks.

like image 32
Jacek Laskowski Avatar answered Sep 28 '22 02:09

Jacek Laskowski