Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Is the memory required to create a DataFrame somewhat equal to the size of the input data?

Tags:

apache-spark

I'm having a hard time telling whether or not I need 1TB of memory to load a Spark DataFrame based on a 1TB database table. Is this the case?

I was told I'm not doing things in a very Spark way in another question, because I started by chunking data by iteratively creating DataFrames and then working on those subsets. The issue was that this was way too slow because there was not enough parallel processing happening. Then based on that feedback, I tried loading an entire table to then use Spark partitioning/grouping/sorting to get the stuff I needed out in the order I need it, but as far as I can tell it is just filling up as much memory as I allocate (on my local test machine) despite having up to thousands of partitions for what is (in my case) a tiny database table at 30GB.

It has been kind of driving me crazy as I have I've done a ton of searching and reading articles and documentation over the last few weeks and I'm having a really hard time finding examples of uses of Spark that actually access what I would call even vaguely "big" datasets. Particularly when it comes to DataFrames and working with real databases as an input. In addition getting feedback saying that I shouldn't be manually chunking any data makes me think that there would then have to be some magic happening where not all of the data is actually selected at once. Any useful resources in this regard would be much appreciated.

like image 781
Project707 Avatar asked Dec 14 '16 04:12

Project707


1 Answers

You should definitely cache() RDD's and DataFrames in the following cases:

  • Reusing them in an iterative loop
  • Reuse the RDD multiple times in a single application, job
  • When the upfront cost to regenerate the RDD partitions is costly (ie. HDFS, after a complex set of map(), filter(), etc.) This helps in the recovery process if a Worker node dies.

Keep in mind that Spark will automatically evict RDD partitions from Workers in an LRU manner. The LRU eviction happens independently on each Worker and depends on the available memory in the Worker.

During the lifecycle of an RDD, RDD partitions may exist in memory or on disk across the cluster depending on available memory.

The Storage tab on the Spark UI shows where partitions exist (memory or disk) across the cluster at any given point in time.

Note that cache() is an alias for persist(StorageLevel.MEMORY_ONLY) which may not be ideal for datasets larger than available cluster memory. Each RDD partition that is evicted out of memory will need to be rebuilt from source (ie. HDFS, Network, etc) which is expensive.

A better solution would be to use persist(StorageLevel.MEMORY_AND_DISK_ONLY) which will spill the RDD partitions to the Worker's local disk if they're evicted from memory. In this case, rebuilding a partition only requires pulling data from the Worker's local disk which is relatively fast.

You also have the choice of persisting the data as a serialized byte array by appending _SER as follows: MEMORY_SER and MEMORY_AND_DISK_SER. This can save space, but incurs an extra serialization/deserialization penalty. And because we're storing data as a serialized byte arrays, less Java objects are created and therefore GC pressure is reduced.

Spark Storage Level

like image 61
Indrajit Swain Avatar answered Sep 22 '22 01:09

Indrajit Swain