I have the following spark job, trying to keep everything in memory:
val myOutRDD = myInRDD.flatMap { fp => val tuple2List: ListBuffer[(String, myClass)] = ListBuffer() : tuple2List }.persist(StorageLevel.MEMORY_ONLY).reduceByKey { (p1, p2) => myMergeFunction(p1,p2) }.persist(StorageLevel.MEMORY_ONLY)
However, when I looked in to the job tracker, I still have a lot of Shuffle Write and Shuffle spill to disk ...
Total task time across all tasks: 49.1 h Input Size / Records: 21.6 GB / 102123058 Shuffle write: 532.9 GB / 182440290 Shuffle spill (memory): 370.7 GB Shuffle spill (disk): 15.4 GB
Then the job failed because "no space left on device"
... I am wondering for the 532.9 GB Shuffle write here, is it written to disk or memory?
Also, why there are still 15.4 G data spill to the disk while I specifically ask to keep them in the memory?
Thanks!
Spark will gather the required data from each partition and combine it into a new partition, likely on a different executor. During a shuffle, data is written to disk and transferred across the network, halting Spark's ability to do processing in-memory and causing a performance bottleneck.
Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.
Shuffle write happens in one of the stage while Shuffle read happens in subsequent stage. Further, Shuffle write operation is executed independently for each of the input partition which needs to be shuffled, and similarly, Shuffle read operation is executed independently for each of the shuffled partition.
Shuffle spill happens when there is not sufficient memory for shuffle data. Since deserialized data occupies more space than serialized data. So, Shuffle spill (memory) is more. Noticed that this spill memory size is incredibly large with big input data.
The persist
calls in your code are entirely wasted if you don't access the RDD multiple times. What's the point of storing something if you never access it? Caching has no bearing on shuffle behavior other than you can avoid re-doing shuffles by keeping their output cached.
Shuffle spill is controlled by the spark.shuffle.spill
and spark.shuffle.memoryFraction
configuration parameters. If spill
is enabled (it is by default) then shuffle files will spill to disk if they start using more than given by memoryFraction
(20% by default).
The metrics are very confusing. My reading of the code is that "Shuffle spill (memory)" is the amount of memory that was freed up as things were spilled to disk. The code for "Shuffle spill (disk)" looks like it's the amount actually written to disk. By the code for "Shuffle write" I think it's the amount written to disk directly — not as a spill from a sorter.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With