I am learning spark using the book 'Learning Spark'. Came across this term(Page 54)
We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it
I am confused what is meant by map-side aggregation here?. The only thing that comes to my mind is Mapper & Reducer in Hadoop MapReduce...but believe that is in no way related to Spark.
It is also known as map-side join(associating worker nodes with mappers). Spark deploys this join strategy when the size of one of the join relations is less than the threshold values(default 10 M). The spark property which defines this threshold is spark.
Aggregations in Spark are similar to any relational database. Aggregations are a way to group data together to look at it from a higher level, as illustrated in figure 1. Aggregation can be performed on tables, joined tables, views, etc. Figure 1. A look at the data before you perform an aggregation.
For example, you cannot use map side aggregation (combiner), if you group values by key (groupByKey operation does not use combiner). The reason is that all values for each key should be present after groupByKey operation is finished. Thus, local reduction of values (combiner) is not possible.
In Spark, the groupByKey function is a frequently used transformation operation that performs shuffling of data. It receives key-value pairs (K, V) as an input, group the values based on key and generates a dataset of (K, Iterable ) pairs as an output.
Idea behind using map-side aggregations is pretty much the same as Hadoop combiners. If a single mapper can yield multiple values for the same key you can reduce shuffling by reducing values locally.
One example of operation which can benefit from map-side aggregation is creating set of value for each key, especially when you partition a RDD before combining:
First lets create some dummy data:
val pairs = sc.parallelize(
("foo", 1) :: ("foo", 1) :: ("foo", 2) ::
("bar", 3) :: ("bar", 4) :: ("bar", 5) :: Nil
)
And merge data using combineByKey
:
import collection.mutable.{Set => MSet}
val combined = partitionedPairs.combineByKey(
(v: Int) => MSet[Int](v),
(set: MSet[Int], v: Int) => set += v,
(set1: MSet[Int], set2: MSet[Int]) => set1 ++= set2
)
Depending on the data distribution this can significantly reduce network traffic. Overall
reduceByKey
,combineByKey
with mapSideCombine
set to true
aggregateByKey
foldByKey
will use map side aggregations, while groupByKey
and combineByKey
with mapSideCombine
set to false
won't.
The choice however between applying map side aggregations or not is not always obvious. Cost of maintaining required data structures and subsequent garbage collection can in many cases exceed cost of shuffle.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With