Let me help to clarify about shuffle in depth and how Spark uses shuffle managers. I report some very helpful resources:
https://trongkhoanguyenblog.wordpress.com/
https://0x0fff.com/spark-architecture-shuffle/
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md
Reading them, I understood there are different shuffle managers. I want to focus about two of them: hash manager
and sort manager
(which is the default manager).
For expose my question, I want to start from a very common transformation:
val rdd = reduceByKey(_ + _)
This transformation causes map-side aggregation and then shuffle for bringing all the same keys into the same partition.
My questions are:
Is Map-Side aggregation implemented using internally a mapPartition transformation and thus aggregating all the same keys using the combiner function or is it implemented with a AppendOnlyMap
or ExternalAppendOnlyMap
?
If AppendOnlyMap
or ExternalAppendOnlyMap
maps are used for aggregating, are they used also for reduce side aggregation that happens into the ResultTask
?
What exaclty the purpose about these two kind of maps (AppendOnlyMap
or ExternalAppendOnlyMap
)?
Are AppendOnlyMap
or ExternalAppendOnlyMap
used from all shuffle managers or just from the sortManager?
I read that after AppendOnlyMap
or ExternalAppendOnlyMap
are full, are spilled into a file, how exactly does this steps happen?
Using the Sort shuffle manager, we use an appendOnlyMap for aggregating and combine partition records, right? Then when execution memory is fill up, we start sorting map, spilling it to disk and then clean up the map, my question is : what is the difference between spill to disk and shuffle write? They consist basically in creating file on local file system, but they are treat differently, Shuffle write records, are not put into the appendOnlyMap.
Can you explain in depth what happen when reduceByKey being executed, explaining me all the steps involved for to accomplish that? Like for example all the steps for map side aggregation, shuffling and so on.
The results of the map tasks are kept in memory. When results do not fit in memory, Spark stores the data on a disk. Spark shuffles the mapped data across partitions, some times it also stores the shuffled data into a disk for reuse when it needs to recalculate. Finally runs reduce tasks on each partition based on key.
The next strategy is reduce the amount of data being shuffled as a whole. few of the things that we can do are: get rid of the columns that you don't need, filter out unnecessary records, optimize data ingestion. De-normalize the datasets specifically if the shuffle is caused by a join.
Currently, there are three different implementations of shuffles in Spark, each with its own advantages and drawbacks. Therefore, there will be three follow-up articles, one on each of the shuffles.
Shuffle sort-merge join involves, shuffling of data to get the same join_key with the same worker, and then performing sort-merge join operation at the partition level in the worker nodes. Things to Note: Since spark 2.3, this is the default join strategy in spark and can be disabled with spark.
It follows the description of reduceByKey
step-by-step:
reduceByKey
calls combineByKeyWithTag
, with identity combiner and identical merge value and create valuecombineByKeyWithClassTag
creates an Aggregator
and returns ShuffledRDD
. Both "map" and "reduce" side aggregations use internal mechanism and don't utilize mapPartitions
.Agregator
uses ExternalAppendOnlyMap
for both combineValuesByKey
("map side reduction") and combineCombinersByKey
("reduce side reduction")ExternalAppendOnlyMap.insertAllMethod
ExternalAppendOnlyMap
keeps track of spilled parts and the current in-memory map (SizeTrackingAppendOnlyMap
)insertAll
method updates in-memory map and checks on insert if size estimated size of the current map exceeds the threshold. It uses inherited Spillable.maybeSpill
method. If threshold is exceeded this method calls spill
as a side effect, and insertAll
initializes clean SizeTrackingAppendOnlyMap
spill
calls spillMemoryIteratorToDisk
which gets DiskBlockObjectWriter
object from the block manager.insertAll
steps are applied for both map and reduce side aggregations with corresponding Aggregator
functions with shuffle stage in between.
As of Spark 2.0 there is only sort based manager: SPARK-14667
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With