Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

'map-side' aggregation in Spark

Tags:

apache-spark

I am learning spark using the book 'Learning Spark'. Came across this term(Page 54) We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it I am confused what is meant by map-side aggregation here?. The only thing that comes to my mind is Mapper & Reducer in Hadoop MapReduce...but believe that is in no way related to Spark.

like image 263
Raj Avatar asked Jul 08 '15 05:07

Raj


People also ask

What is map side combine in Spark?

It is also known as map-side join(associating worker nodes with mappers). Spark deploys this join strategy when the size of one of the join relations is less than the threshold values(default 10 M). The spark property which defines this threshold is spark.

What is aggregation in Spark?

Aggregations in Spark are similar to any relational database. Aggregations are a way to group data together to look at it from a higher level, as illustrated in figure 1. Aggregation can be performed on tables, joined tables, views, etc. Figure 1. A look at the data before you perform an aggregation.

Which transformation in Spark does not use combiner for aggregation?

For example, you cannot use map side aggregation (combiner), if you group values by key (groupByKey operation does not use combiner). The reason is that all values for each key should be present after groupByKey operation is finished. Thus, local reduction of values (combiner) is not possible.

How do I use groupByKey in Spark?

In Spark, the groupByKey function is a frequently used transformation operation that performs shuffling of data. It receives key-value pairs (K, V) as an input, group the values based on key and generates a dataset of (K, Iterable ) pairs as an output.


1 Answers

Idea behind using map-side aggregations is pretty much the same as Hadoop combiners. If a single mapper can yield multiple values for the same key you can reduce shuffling by reducing values locally.

One example of operation which can benefit from map-side aggregation is creating set of value for each key, especially when you partition a RDD before combining:

First lets create some dummy data:

val pairs = sc.parallelize(
    ("foo", 1) :: ("foo", 1) :: ("foo", 2) ::
    ("bar", 3) :: ("bar", 4) :: ("bar", 5) :: Nil
)

And merge data using combineByKey:

import collection.mutable.{Set => MSet}
val combined = partitionedPairs.combineByKey(
    (v: Int) => MSet[Int](v),
    (set: MSet[Int], v: Int) => set += v,
    (set1: MSet[Int], set2: MSet[Int]) => set1 ++= set2
)

Depending on the data distribution this can significantly reduce network traffic. Overall

  • reduceByKey,
  • combineByKey with mapSideCombine set to true
  • aggregateByKey
  • foldByKey

will use map side aggregations, while groupByKey and combineByKey with mapSideCombine set to false won't.

The choice however between applying map side aggregations or not is not always obvious. Cost of maintaining required data structures and subsequent garbage collection can in many cases exceed cost of shuffle.

like image 143
zero323 Avatar answered Sep 28 '22 19:09

zero323