Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding shuffle managers in Spark

Let me help to clarify about shuffle in depth and how Spark uses shuffle managers. I report some very helpful resources:

https://trongkhoanguyenblog.wordpress.com/

https://0x0fff.com/spark-architecture-shuffle/

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md

Reading them, I understood there are different shuffle managers. I want to focus about two of them: hash manager and sort manager(which is the default manager).

For expose my question, I want to start from a very common transformation:

val rdd = reduceByKey(_ + _)

This transformation causes map-side aggregation and then shuffle for bringing all the same keys into the same partition.

My questions are:

  • Is Map-Side aggregation implemented using internally a mapPartition transformation and thus aggregating all the same keys using the combiner function or is it implemented with a AppendOnlyMap or ExternalAppendOnlyMap?

  • If AppendOnlyMap or ExternalAppendOnlyMap maps are used for aggregating, are they used also for reduce side aggregation that happens into the ResultTask?

  • What exaclty the purpose about these two kind of maps (AppendOnlyMap or ExternalAppendOnlyMap)?

  • Are AppendOnlyMap or ExternalAppendOnlyMap used from all shuffle managers or just from the sortManager?

  • I read that after AppendOnlyMap or ExternalAppendOnlyMap are full, are spilled into a file, how exactly does this steps happen?

  • Using the Sort shuffle manager, we use an appendOnlyMap for aggregating and combine partition records, right? Then when execution memory is fill up, we start sorting map, spilling it to disk and then clean up the map, my question is : what is the difference between spill to disk and shuffle write? They consist basically in creating file on local file system, but they are treat differently, Shuffle write records, are not put into the appendOnlyMap.

  • Can you explain in depth what happen when reduceByKey being executed, explaining me all the steps involved for to accomplish that? Like for example all the steps for map side aggregation, shuffling and so on.

like image 462
Giorgio Avatar asked Jan 11 '17 08:01

Giorgio


People also ask

How does shuffle work in Spark?

The results of the map tasks are kept in memory. When results do not fit in memory, Spark stores the data on a disk. Spark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate. Finally runs reduce tasks on each partition based on key.

How do you optimize shuffling in Spark?

The next strategy is reduce the amount of data being shuffled as a whole. few of the things that we can do are: get rid of the columns that you don't need, filter out unnecessary records, optimize data ingestion. De-normalize the datasets specifically if the shuffle is caused by a join.

How many operations are involved in Spark shuffle?

Currently, there are three different implementations of shuffles in Spark, each with its own advantages and drawbacks. Therefore, there will be three follow-up articles, one on each of the shuffles.

What is shuffle sort in Spark?

Shuffle sort-merge join involves, shuffling of data to get the same join_key with the same worker, and then performing sort-merge join operation at the partition level in the worker nodes. Things to Note: Since spark 2.3, this is the default join strategy in spark and can be disabled with spark.


1 Answers

It follows the description of reduceByKey step-by-step:

  1. reduceByKey calls combineByKeyWithTag, with identity combiner and identical merge value and create value
  2. combineByKeyWithClassTag creates an Aggregator and returns ShuffledRDD. Both "map" and "reduce" side aggregations use internal mechanism and don't utilize mapPartitions.
  3. Agregator uses ExternalAppendOnlyMap for both combineValuesByKey ("map side reduction") and combineCombinersByKey ("reduce side reduction")
  4. Both methods use ExternalAppendOnlyMap.insertAllMethod
  5. ExternalAppendOnlyMap keeps track of spilled parts and the current in-memory map (SizeTrackingAppendOnlyMap)
  6. insertAll method updates in-memory map and checks on insert if size estimated size of the current map exceeds the threshold. It uses inherited Spillable.maybeSpill method. If threshold is exceeded this method calls spill as a side effect, and insertAll initializes clean SizeTrackingAppendOnlyMap
  7. spill calls spillMemoryIteratorToDisk which gets DiskBlockObjectWriter object from the block manager.

insertAll steps are applied for both map and reduce side aggregations with corresponding Aggregator functions with shuffle stage in between.

As of Spark 2.0 there is only sort based manager: SPARK-14667

like image 126
user7337271 Avatar answered Sep 22 '22 09:09

user7337271