Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding Spark shuffle spill

Tags:

apache-spark

If I understand correctly, when a reduce task goes about gathering its input shuffle blocks ( from outputs of different map tasks ) it first keeps them in memory ( Q1 ). When the amount of shuffles-reserved memory of an executor ( before the change in memory management ( Q2 ) ) is exhausted, the in-memory data is "spilled" to disk. if spark.shuffle.spill.compress is true then that in-memory data is written to disk in a compressed fashion.

My questions:

Q0: Is my understanding correct?

Q1: Is the gathered data inside the reduce task always uncompressed?

Q2: How can I estimate the amount of executor memory available for gathering shuffle blocks?

Q3: I've seen the claim "shuffle spill happens when your dataset cannot fit in memory", but to my understanding as long as the shuffle-reserved executor memory is big enough to contain all the ( uncompressed ) shuffle input blocks of all its ACTIVE tasks, then no spill should occur, is that correct?

If so, to avoid spills one needs to make sure that the ( uncompressed ) data which ends up in all parallel reduce-side tasks is less than the executor's shuffle-reserved memory part?

like image 966
Harel Gliksman Avatar asked Jun 16 '16 00:06

Harel Gliksman


People also ask

How does Spark shuffling work?

Spark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate. Finally runs reduce tasks on each partition based on key.

What triggers a shuffle in Spark?

Transformations which can cause a shuffle include repartition operations like repartition and coalesce , 'ByKey operations (except for counting) like groupByKey and reduceByKey , and join operations like cogroup and join .


1 Answers

There are differences in memory management in before and after 1.6. In both cases, there are notions of execution memory and storage memory. The difference is that before 1.6 it's static. Meaning there is a configuration parameter that specifies how much memory is for execution and for storage. And there is a spill, when either one is not enough.

One of the issues that Apache Spark has to workaround is a concurrent execution of:

  • different stages that are executed in parallel
  • different tasks like aggregation or sorting.

  1. I'd say that your understanding is correct.

  2. What's in memory is uncompressed or else it cannot be processed. Execution memory is spilled to disk in blocks and as you mentioned can be compressed.

  3. Well, since 1.3.1 you can configure it, then you know the size. As of what's left at any moment in time, you can see that by looking at the executor process with something like jstat -gcutil <pid> <period>. It might give you a clue of how much memory is free there. Knowing how much memory is configured for storage and execution, having as little default.parallelism as possible might give you a clue.

  4. That's true, but it's hard to reason about; there might be skew in the data such as some keys have more values than the others, there are many parallel executions, etc.

like image 91
evgenii Avatar answered Sep 20 '22 23:09

evgenii