Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Execution of TB file in memory

Let us assume i have one Tb data file. Each Node memory in ten node cluster is 3GB.

I want to process the file using spark. But how does the One TeraByte fits in memory?

Will it throw out of memory exception?

How does it work?

like image 811
yoga Avatar asked Jan 16 '16 06:01

yoga


1 Answers

As Thilo mentioned, Spark does not need to load everything in memory to be able to process it. This is because Spark will partition the data into smaller blocks and operate on these separately. The number of partitions, and this their size depends on several things:

  • Where the file is stored. The most commonly used options with Spark already store the file in a bunch of blocks rather than as a single big piece of data. If it's stored in HDFS for instance, by default these blocks are 64MB and the blocks are distributed (and replicated) across your nodes. With files stored in S3 you will get blocks of 32MB. This is defined by the Hadoop FileSystem that is used to read the files and applies to files from other filesystems for which Spark uses Hadoop to reed them.
  • Any repartitioning you do. You can call repartition(N) or coalesce(N) on an RDD or a DataFrame to change the number of partitions and thus their size. coalesce is preferred for reducing the number without shuffling data across your nodes, whereas repartition allows you to specify a new way to split up your data, i.e. have better control of which parts of the data is processed on the same node.
  • Some more advanced transformations you may do. For instance, the configuration setting spark.sql.shuffle.partitions, by default set to 200, determines how many partitions a DataFrame resulting from joins and aggregations will have

Caching

The previous bit relates to just standard processing of data in Spark, but I feel you may be let to the wrong ideas because of Spark being advertised as 'in-memory', so I wanted to address that a bit. By default there is nothing in Spark that is more 'in-memory' than any other data processing tool: a simple example as sc.textFile(foo).map(mapFunc).saveTextFile(bar) reads a file (block by block and distributed over your nodes), does the mapping in memory (like any computer program) and then saves it to storage again. Spark's use of memory becomes more interesting in the following (in Scala as I'm more fammiliar with it, but the concept and method names are exactly the same in Python):

val rdd = sc.textFile(foo)
// Do some preprocessing, such as parsing lines
val preprocessed = rdd.map(preprocessFunc)
// Tell Spark to cache preprocessed data (by default in memory)
preprocessed.cache()
// Perform some mapping and save output
preprocessed.map(mapFunc1).saveTextFile(outFile1)
// Perform a different mapping and save somewhere else
preprocessed.map(mapFunc2).saveTextFile(outFile2)

The idea here is to use cache() so preprocessing doesn't have to be done twice (possibly); by default Spark doesn't save any intermediate results, but calculates the full chain for each separate action, where the 'actions' here are the saveTextFile calls.

I said 'possibly' because the ability to actually cache data is limited by the memory in your nodes. Spark reserves a certain amount of memory for cache storage, separate from work memory (see http://spark.apache.org/docs/latest/configuration.html#memory-management how the sizes of these parts of memory are managed), and can cache only up to as much as that amount can hold.

Depending on your partitioning it may be less though. Let's say you have 2GB of storage memory on each of your 3 nodes and the data in preprocessed is 6GB. If this data has 3 partitions, it will fit perfectly and all input data to mapFunc2 will be loaded from memory. But if you have say 4 partitions, each of 1.5Gb, only 1 partition can be cached on each node; the 4th partition won't fit in the 0.5GB that is still left on each machine, so this partition has to be recalculated for the second mapping and only 3/4 of your preprocessed data will be read from memory.

So in this sense it is better to have many small partitions, to make caching as efficient as possible, but this may have other downsides: more overhead, huge delays if you happen to use Mesos with fine grained mode, and tons of small output files (if you don't coalesce before saving) as Spark saves each partition as separate file.

As Durga mentioned there is also the possibility to have data that does not fit in memory spill to disk, you can follow his link for that :)

like image 99
sgvd Avatar answered Sep 23 '22 17:09

sgvd