Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

When does shuffling occur in Apache Spark?

I am optimizing parameters in Spark, and would like to know exactly how Spark is shuffling data.

Precisely, I have a simple word count program, and would like to know how spark.shuffle.file.buffer.kb is affecting the run time. Right now, I only see slowdown when I make this parameter very high (I am guessing this prevents every task's buffer from fitting in memory simultaneously).

Could someone explain how Spark is performing reductions? For example, the data is read and partitioned in an RDD, and when an "action" function is called, Spark sends out tasks to the worker nodes. If the action is a reduction, how does Spark handle this, and how are shuffle files / buffers related to this process?

like image 815
cnnrznn Avatar asked Jul 13 '15 14:07

cnnrznn


People also ask

What triggers a shuffle in Spark?

Transformations which can cause a shuffle include repartition operations like repartition and coalesce , 'ByKey operations (except for counting) like groupByKey and reduceByKey , and join operations like cogroup and join .

What is shuffle stage in Spark?

A shuffle occurs when data is rearranged between partitions. This is required when a transformation requires information from other partitions, such as summing all the values in a column. Spark will gather the required data from each partition and combine it into a new partition, likely on a different executor.

Does Spark require shuffling?

On the reduce side, Spark requires all shuffled data to fit into memory of the corresponding reducer task, on the contrary of Hadoop that had an option to spill this over to disk.

Where does Spark write shuffle data?

Currently in Spark the default shuffle process is hash-based. Usually it uses a HashMap to aggregate the shuffle data and no sort is applied. If the data needs to be sorted, user has to call sortByKey() explicitly.


2 Answers

Question : As for your question concerning when shuffling is triggered on Spark?

Answer : Any join, cogroup, or ByKey operation involves holding objects in hashmaps or in-memory buffers to group or sort. join, cogroup, and groupByKey use these data structures in the tasks for the stages that are on the fetching side of the shuffles they trigger. reduceByKey and aggregateByKey use data structures in the tasks for the stages on both sides of the shuffles they trigger.

Explanation : How does shuffle operation work in Spark?

The shuffle operation is implemented differently in Spark compared to Hadoop. I don't know if you are familiar with how it works with Hadoop but let's focus on Spark for now.

On the map side, each map task in Spark writes out a shuffle file (os disk buffer) for every reducer – which corresponds to a logical block in Spark. These files are not intermediary in the sense that Spark does not merge them into larger partitioned ones. Since scheduling overhead in Spark is lesser, the number of mappers (M) and reducers(R) is far higher than in Hadoop. Thus, shipping M*R files to the respective reducers could result in significant overheads.

Similar to Hadoop, Spark also provide a parameter spark.shuffle.compress to specify compression libraries to compress map outputs. In this case, it could be Snappy (by default) or LZF. Snappy uses only 33KB of buffer for each opened file and significantly reduces risk of encountering out-of-memory errors.

On the reduce side, Spark requires all shuffled data to fit into memory of the corresponding reducer task, on the contrary of Hadoop that had an option to spill this over to disk. This would of course happen only in cases where the reducer task demands all shuffled data for a GroupByKey or a ReduceByKey operation, for instance. Spark throws an out-of-memory exception in this case, which has proved quite a challenge for developers so far.

Also with Spark there is no overlapping copy phase, unlike Hadoop that has an overlapping copy phase where mappers push data to the reducers even before map is complete. This means that the shuffle is a pull operation in Spark, compared to a push operation in Hadoop. Each reducer should also maintain a network buffer to fetch map outputs. Size of this buffer is specified through the parameter spark.reducer.maxMbInFlight (by default, it is 48MB).

For more information about shuffling in Apache Spark, I suggest the following readings :

  • Optimizing Shuffle Performance in Spark by Aaron Davidson and Andrew Or.
  • SPARK-751 JIRA issue and Consolidating Shuffle files by Jason Dai.
like image 81
eliasah Avatar answered Oct 16 '22 13:10

eliasah


It occurs whenever data needs to move between executors (worker nodes)

like image 44
coding99 Avatar answered Oct 16 '22 13:10

coding99