Let us assume i have one Tb data file. Each Node memory in ten node cluster is 3GB.
I want to process the file using spark. But how does the One TeraByte fits in memory?
Will it throw out of memory exception?
How does it work?
As Thilo mentioned, Spark does not need to load everything in memory to be able to process it. This is because Spark will partition the data into smaller blocks and operate on these separately. The number of partitions, and this their size depends on several things:
FileSystem
that is used to read the files and applies to files from other filesystems for which Spark uses Hadoop to reed them.repartition(N)
or coalesce(N)
on an RDD or a DataFrame to change the number of partitions and thus their size. coalesce
is preferred for reducing the number without shuffling data across your nodes, whereas repartition
allows you to specify a new way to split up your data, i.e. have better control of which parts of the data is processed on the same node.spark.sql.shuffle.partitions
, by default set to 200, determines how many partitions a DataFrame
resulting from joins and aggregations will haveThe previous bit relates to just standard processing of data in Spark, but I feel you may be let to the wrong ideas because of Spark being advertised as 'in-memory', so I wanted to address that a bit. By default there is nothing in Spark that is more 'in-memory' than any other data processing tool: a simple example as sc.textFile(foo).map(mapFunc).saveTextFile(bar)
reads a file (block by block and distributed over your nodes), does the mapping in memory (like any computer program) and then saves it to storage again. Spark's use of memory becomes more interesting in the following (in Scala as I'm more fammiliar with it, but the concept and method names are exactly the same in Python):
val rdd = sc.textFile(foo)
// Do some preprocessing, such as parsing lines
val preprocessed = rdd.map(preprocessFunc)
// Tell Spark to cache preprocessed data (by default in memory)
preprocessed.cache()
// Perform some mapping and save output
preprocessed.map(mapFunc1).saveTextFile(outFile1)
// Perform a different mapping and save somewhere else
preprocessed.map(mapFunc2).saveTextFile(outFile2)
The idea here is to use cache()
so preprocessing doesn't have to be done twice (possibly); by default Spark doesn't save any intermediate results, but calculates the full chain for each separate action, where the 'actions' here are the saveTextFile
calls.
I said 'possibly' because the ability to actually cache data is limited by the memory in your nodes. Spark reserves a certain amount of memory for cache storage, separate from work memory (see http://spark.apache.org/docs/latest/configuration.html#memory-management how the sizes of these parts of memory are managed), and can cache only up to as much as that amount can hold.
Depending on your partitioning it may be less though. Let's say you have 2GB of storage memory on each of your 3 nodes and the data in preprocessed
is 6GB. If this data has 3 partitions, it will fit perfectly and all input data to mapFunc2
will be loaded from memory. But if you have say 4 partitions, each of 1.5Gb, only 1 partition can be cached on each node; the 4th partition won't fit in the 0.5GB that is still left on each machine, so this partition has to be recalculated for the second mapping and only 3/4 of your preprocessed data will be read from memory.
So in this sense it is better to have many small partitions, to make caching as efficient as possible, but this may have other downsides: more overhead, huge delays if you happen to use Mesos with fine grained mode, and tons of small output files (if you don't coalesce before saving) as Spark saves each partition as separate file.
As Durga mentioned there is also the possibility to have data that does not fit in memory spill to disk, you can follow his link for that :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With