Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does Spark aggregate function - aggregateByKey work?

Say I have a distribute system on 3 nodes and my data is distributed among those nodes. for example, I have a test.csv file which exists on all 3 nodes and it contains 2 columns of:

**row   | id,  c.** --------------- row1  | k1 , c1   row2  | k1 , c2   row3  | k1 , c3   row4  | k2 , c4   row5  | k2 , c5   row6  | k2 , c6   row7  | k3 , c7   row8  | k3 , c8   row9  | k3 , c9   row10 | k4 , c10    row11 | k4 , c11   row12 | k4 , c12  

Then I use SparkContext.textFile to read the file out as rdd and so. So far as I understand, each spark worker node will read the a portion out from the file. So right now let's say each node will store:

  • node 1: row 1~4
  • node 2: row 5~8
  • node 3: row 9~12

My question is that let's say I want to do computation on those data, and there is one step that I need to group the key together, so the key value pair would be [k1 [{k1 c1} {k1 c2} {k1 c3}]].. and so on.

There is a function called groupByKey() which is very expensive to use, and aggregateByKey() is recommended to use. So I'm wondering how does groupByKey() and aggregateByKey() works under the hood? Can someone using the example I provided above to explain please? After shuffling where does the rows reside on each node?

like image 667
EdwinGuo Avatar asked Jul 17 '14 13:07

EdwinGuo


People also ask

How does aggregateByKey work in Spark?

Function aggregateByKey is one of the aggregate function (Others are reduceByKey & groupByKey) available in Spark. This is the only aggregation function which allows multiple type of aggregation(Maximun, minimun, average, sum & count) at the same time.

What do you understand by aggregateByKey and combineByKey?

aggregateByKey takes an initial accumulator, a first lambda function to merge a value to an accumulator and a second lambda function to merge two accumulators. Show activity on this post. combineByKey is more general then aggregateByKey .

How do you use aggregate in RDD?

aggregate() function Form first one, we combine the element from our RDD along with the accumulator, and from the second one, it combines the accumulator. Thus, we supply the initial zero value of the type which we want to return, in aggregate. So we get 21 + 22 + 31 + (4 * 3) = 86.

What is the difference between reduceByKey and groupByKey?

The groupByKey can cause out of disk problems as data is sent over the network and collected on the reduced workers. You can see the below example. Whereas in reducebykey, Data are combined at each partition, only one output for one key at each partition to send over the network.


2 Answers

aggregateByKey() is quite different from reduceByKey. What happens is that reduceByKey is sort of a particular case of aggregateByKey.

aggregateByKey() will combine the values for a particular key, and the result of such combination can be any object that you specify. You have to specify how the values are combined ("added") inside one partition (that is executed in the same node) and how you combine the result from different partitions (that may be in different nodes). reduceByKey is a particular case, in the sense that the result of the combination (e.g. a sum) is of the same type that the values, and that the operation when combined from different partitions is also the same as the operation when combining values inside a partition.

An example: Imagine you have a list of pairs. You parallelize it:

val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5))) 

Now you want to "combine" them by key producing a sum. In this case reduceByKey and aggregateByKey are the same:

val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything resReduce.collect res3: Array[(String, Int)] = Array((b,7), (a,9))  //0 is initial value, _+_ inside partition, _+_ between partitions val resAgg = pairs.aggregateByKey(0)(_+_,_+_) resAgg.collect res4: Array[(String, Int)] = Array((b,7), (a,9)) 

Now, imagine that you want the aggregation to be a Set of the values, that is a different type that the values, that are integers (the sum of integers is also integers):

import scala.collection.mutable.HashSet //the initial value is a void Set. Adding an element to a set is the first //_+_ Join two sets is the  _++_ val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_) sets.collect res5: Array[(String, scala.collection.mutable.HashSet[Int])]  =Array((b,Set(7)), (a,Set(1, 5, 3))) 
like image 189
Antoni Avatar answered Sep 23 '22 00:09

Antoni


aggregateByKey() is almost identical to reduceByKey() (both calling combineByKey() behind the scenes), except you give a starting value for aggregateByKey(). Most people are familiar with reduceByKey(), so I will use that in the explanation.

The reason reduceByKey() is so much better is because it makes use of a MapReduce feature called a combiner. Any function like + or * can be used in this fashion because the order of the elements it is called on doesn't matter. This allows Spark to start "reducing" values with the same key even if they are not all in the same partition yet.

On the flip side groupByKey() gives you more versatility since you write a function that takes an Iterable, meaning you could even pull all the elements into an array. However it is inefficient because for it to work the full set of (K,V,) pairs have to be in one partition.

The step that moves the data around on a reduce type operation is generally called the shuffle, at the very simplest level the data is partitioned to each node (often with a hash partitioner), and then sorted on each node.

like image 31
aaronman Avatar answered Sep 26 '22 00:09

aaronman