Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

When are cache and persist executed (since they don't seem like actions)?

I am implementing a spark application, of which below is a sample snippet(Not the exact same code):

val rdd1 = sc.textfile(HDFS_PATH)
val rdd2 = rdd1.map(func)
rdd2.persist(StorageLevel.MEMORY_AND_DISK)
println(rdd2.count)

On checking the performance of this code from the Spark Application Master UI, I see an entry for the count action, but not for the persist. The DAG for this count action also has a node for the 'map' transformation (line 2 of the above code).

Is it safe to conclude that the map transformation is executed when count (in the last line) is encountered, and not when persist is encountered?

Also, at what point is rdd2 actually persisted? I understand that only two types of operations can be called on RDDs - transformations and actions. If the RDD is persisted lazily when the count action is called, would persist be considered a transformation or an action or neither?

like image 442
Ankit Khettry Avatar asked May 16 '17 12:05

Ankit Khettry


People also ask

What is persist and cache?

The only difference between cache() and persist() is ,using Cache technique we can save intermediate results in memory only when needed while in Persist() we can save the intermediate results in 5 storage levels(MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY).

What is the difference between cache () and persist ()?

The difference between cache() and persist() is that using cache() the default storage level is MEMORY_ONLY while using persist() we can use various storage levels (described below). It is a key tool for an interactive algorithm.

When should I use persist in Spark?

Spark RDD persistence is an optimization technique which saves the result of RDD evaluation in cache memory. Using this we save the intermediate result so that we can use it further if required. It reduces the computation overhead.

Is persist an action or transformation?

Though cache() or persist() is just another function on RDD which marks RDD to be cached or persisted. The first time an RDD is evaluated as a consequence of an action, it will be persisted/cached. So, cache() or persist() is neither an action nor a transformation.


1 Answers

Dataset's cache and persist operators are lazy and don't have any effect until you call an action (and wait till the caching has finished which is the extra price for having a better performance later on).

From Spark's official documentation RDD Persistence (with the sentence in bold mine):

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

That's exactly the reason why some people (and Spark SQL itself!) do the following trick:

rdd2.persist(StorageLevel.MEMORY_AND_DISK).count

to trigger the caching.

count operator is fairly cheap so the net effect is that the caching is executed almost immediately after the line (there might be a small delay before the caching has completed as it executes asynchronously).

The benefits of this count after persist are as follows:

  1. No action (but the count itself) will "suffer" the extra time for caching

  2. The time between this line and the place where the cached rdd2 is used could be enough to fully complete the caching and hence the time would be used better (without extra "slowdown" for caching)

So when you asked:

would persist be considered a transformation or an action or neither?

I'd say it's neither and consider it an optimization hint (that may or may not be executed or taken into account ever).


Use web UI's Storage tab to see what Datasets (as their underlying RDDs) have already been persisted.

enter image description here

You can also see cache or persist operators' output using explain (or simply QueryExecution.optimizedPlan).

val q1 = spark.range(10).groupBy('id % 5).agg(count("*") as "count").cache
scala> q1.explain
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [(id % 5)#120L, count#119L]
      +- InMemoryRelation [(id % 5)#120L, count#119L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)])
               +- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
                  +- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)])
                     +- *(1) Range (0, 10, step=1, splits=16)

scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#5L, count#4L], StorageLevel(disk, memory, deserialized, 1 replicas)
01    +- *(2) HashAggregate(keys=[(id#0L % 5)#8L], functions=[count(1)], output=[(id % 5)#5L, count#4L])
02       +- Exchange hashpartitioning((id#0L % 5)#8L, 200), true, [id=#13]
03          +- *(1) HashAggregate(keys=[(id#0L % 5) AS (id#0L % 5)#8L], functions=[partial_count(1)], output=[(id#0L % 5)#8L, count#10L])
04             +- *(1) Range (0, 10, step=1, splits=16)

Please note that the count above is a standard function not an action and no caching happens. It's just a coincidence that count is the name of a standard function and an Dataset action.

You can cache a table using pure SQL (this is eager!)

// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q2 = spark.sql("SELECT * FROM range5")
scala> q2.explain
== Physical Plan ==
*(1) ColumnarToRow
+- Scan In-memory table `range5` [id#51L]
      +- InMemoryRelation [id#51L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Range (0, 5, step=1, splits=16)

InMemoryTableScan physical operator (with InMemoryRelation logical plan) is how you can make sure that the query is cached in-memory and hence reused.


Moreover, Spark SQL itself uses the same pattern to trigger DataFrame caching for SQL's CACHE TABLE query (which, unlike RDD caching, is by default eager):

if (!isLazy) {
  // Performs eager caching
  sparkSession.table(tableIdent).count()
}

That means that depending on the operators you may have different result as far as caching is concerned. cache and persist operators are lazy by default while SQL's CACHE TABLE is eager.

like image 151
Jacek Laskowski Avatar answered Oct 11 '22 04:10

Jacek Laskowski