Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

where does df.cache() is stored

I would like to understand in which node (driver or worker/executor) does below code is stored

df.cache() //df is a large dataframe (200GB)

And which has a better performance: using sql cachetable or cache(). My understanding is that one of them is lazy and the other is eager.

like image 954
shiv455 Avatar asked Aug 08 '17 03:08

shiv455


People also ask

What is DataFrame cache ()?

cache() is an Apache Spark transformation that can be used on a DataFrame, Dataset, or RDD when you want to perform more than one action. cache() caches the specified DataFrame, Dataset, or RDD in the memory of your cluster's workers.

Where is cached data stored in Spark?

Spark DataFrame or Dataset cache() method by default saves it to storage level ` MEMORY_AND_DISK ` because recomputing the in-memory columnar representation of the underlying table is expensive. Note that this is different from the default cache level of ` RDD. cache() ` which is ' MEMORY_ONLY '.

Where is cache value stored?

The data in a cache is generally stored in fast access hardware such as RAM (Random-access memory) and may also be used in correlation with a software component. A cache's primary purpose is to increase data retrieval performance by reducing the need to access the underlying slower storage layer.

What is the default storage of cache ()?

The cache method calls persist method with default storage level MEMORY_AND_DISK. Other storage levels are discussed later. The rule of thumb for caching is to identify the Dataframe that you will be reusing in your Spark Application and cache it.


Video Answer


4 Answers

Just adding my 25 cents. A SparkDF.cache() would load the data in executor memory. It will not load in driver memory. Which is what's desired. Here's a snapshot of 50% of data load post a df.cache().count() I just ran.

enter image description here

Cache() persists in memory and disk as delineated by koiralo, and is also lazy evaluated.

Cachedtable() stores on disk and is resilient to node failures for this reason.

Credit: https://forums.databricks.com/answers/63/view.html

like image 68
Vaibhav Avatar answered Oct 21 '22 05:10

Vaibhav


df.cache() calls the persist() method which stores on storage level as MEMORY_AND_DISK, but you can change the storage level

The persist() method calls sparkSession.sharedState.cacheManager.cacheQuery() and when you see the code for cacheTable it also calls the same sparkSession.sharedState.cacheManager.cacheQuery()

that means both are same and are lazily evaluated (only evaluated once action is performed), except persist method can store as the storage level provided, these are the available storage level

  • NONE
  • DISK_ONLY
  • DISK_ONLY_2
  • MEMORY_ONLY
  • MEMORY_ONLY_2
  • MEMORY_ONLY_SER
  • MEMORY_ONLY_SER_2
  • MEMORY_AND_DISK
  • MEMORY_AND_DISK_2
  • MEMORY_AND_DISK_SER
  • MEMORY_AND_DISK_SER_2
  • OFF_HEAP

You can also use the SQL CACHE TABLE which is not lazily evaluated and stores the whole table in memory, which may also lead to OOM

Summary: cache(), persist(), cacheTable() are lazily evaluated and need to perform an action to work where as SQL CACHE TABLE is an eager

See here for details!

You can choose as per your requirement!

Hope this helps!

like image 44
koiralo Avatar answered Oct 21 '22 05:10

koiralo


The cache (or persist) method marks the DataFrame for caching in memory (or disk, if necessary, as the other answer says), but this happens only once an action is performed on the DataFrame, and only in a lazy fashion, i.e., if you ultimately read only 100 rows, only those 100 rows are cached. Creating a temporary table and using cacheTable is eager in the sense that it will cache the entire table immediately. Which is more performant depends on your situation. One thing that I've done with ordinary DataFrame cache is to immediately call .count() right after, forcing the DataFrame to be cached, and obviating the need to register a temp table and such.

like image 1
user4601931 Avatar answered Oct 21 '22 04:10

user4601931


Spark Memory. this is the memory pool managed by Apache Spark. Its size can be calculated as (“Java Heap” – “Reserved Memory”) * spark.memory.fraction, and with Spark 1.6.0 defaults it gives us (“Java Heap” – 300MB) * 0.75. For example, with 4GB heap this pool would be 2847MB in size. This whole pool is split into 2 regions – Storage Memory and Execution Memory, and the boundary between them is set by spark.memory.storageFraction parameter, which defaults to 0.5. The advantage of this new memory management scheme is that this boundary is not static, and in case of memory pressure the boundary would be moved, i.e. one region would grow by borrowing space from another one. I would discuss the “moving” this boundary a bit later, now let’s focus on how this memory is being used:

1. Storage Memory. This pool is used for both storing Apache Spark cached data and for temporary space serialized data “unroll”. Also all the “broadcast” variables are stored there as cached blocks. In case you’re curious, here’s the code of unroll. As you may see, it does not require that enough memory for unrolled block to be available – in case there is not enough memory to fit the whole unrolled partition it would directly put it to the drive if desired persistence level allows this. As of “broadcast”, all the broadcast variables are stored in cache with MEMORY_AND_DISK persistence level.

2. Execution Memory. This pool is used for storing the objects required during the execution of Spark tasks. For example, it is used to store shuffle intermediate buffer on the Map side in memory, also it is used to store hash table for hash aggregation step. This pool also supports spilling on disk if not enough memory is available, but the blocks from this pool cannot be forcefully evicted by other threads (tasks).
Ok, so now let’s focus on the moving boundary between Storage Memory and Execution Memory. Due to nature of Execution Memory, you cannot forcefully evict blocks from this pool, because this is the data used in intermediate computations and the process requiring this memory would simply fail if the block it refers to won’t be found. But it is not so for the Storage Memory – it is just a cache of blocks stored in RAM, and if we evict the block from there we can just update the block metadata reflecting the fact this block was evicted to HDD (or simply removed), and trying to access this block Spark would read it from HDD (or recalculate in case your persistence level does not allow to spill on HDD).

So, we can forcefully evict the block from Storage Memory, but cannot do so from Execution Memory. When Execution Memory pool can borrow some space from Storage Memory? It happens when either:

There is free space available in Storage Memory pool, i.e. cached blocks don’t use all the memory available there. Then it just reduces the Storage Memory pool size, increasing the Execution Memory pool.
Storage Memory pool size exceeds the initial Storage Memory region size and it has all this space utilized. This situation causes forceful eviction of the blocks from Storage Memory pool, unless it reaches its initial size.
In turn, Storage Memory pool can borrow some space from Execution Memory pool only if there is some free space in Execution Memory pool available.

Initial Storage Memory region size, as you might remember, is calculated as “Spark Memory” * spark.memory.storageFraction = (“Java Heap” – “Reserved Memory”) * spark.memory.fraction * spark.memory.storageFraction. With default values, this is equal to (“Java Heap” – 300MB) * 0.75 * 0.5 = (“Java Heap” – 300MB) * 0.375. For 4GB heap this would result in 1423.5MB of RAM in initial Storage Memory region.

reference -https://0x0fff.com/spark-memory-management/

like image 1
Kishore Avatar answered Oct 21 '22 04:10

Kishore