Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Who can give a clear explanation for `combineByKey` in Spark?

i am learning spark, but i can't understand this function combineByKey.

>>> data = sc.parallelize([("A",1),("A",2),("B",1),("B",2),("C",1)] )
>>> data.combineByKey(lambda v : str(v)+"_", lambda c, v : c+"@"+str(v), lambda c1, c2 : c1+c2).collect()

The output is:

[('A', '1_2_'), ('C', '1_'), ('B', '1_2_')]

First, i am very confused: where is the @ in second step lambda c, v : c+"@"+v? i can't find @ from the result.

Second, i read the function description for combineByKey, but i am confused the algorithm flow.

like image 642
BlackMamba Avatar asked Nov 26 '15 11:11

BlackMamba


People also ask

How do you use combinebykey in spark?

The combineByKey Method. In order to aggregate an RDD’s elements in parallel, Spark’s combineByKey method requires three functions: Create a Combiner. The first required argument in the combineByKey method is a function to be used as the very first aggregation step for each key.

What is the first argument of the combinebykey function?

This function is the first argument of the combineByKey function. It is the first aggregation step for each key. It will be executed when any new key is found in a partition. Execution of this lambda function is local to a partition of a node on each value.

What is combinebykey in RDD?

RDD.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash>) [source] ¶ Generic function to combine the elements for each key using a custom set of aggregation functions.

What is the difference between aggregatebykey and combinebykey?

The combineByKey function takes 3 functions as arguments: A function that creates a combiner. In the aggregateByKey function the first argument was simply an initial zero value. In combineByKey we provide a function that will accept our current value as a parameter and return our new value that will be merged with additional values.


2 Answers

The groupByKey call makes no attempt at merging/combining values, so it’s an expensive operation.

Thus the combineByKey call is just such an optimization. When using combineByKey values are merged into one value at each partition then each partition value is merged into a single value. It’s worth noting that the type of the combined value does not have to match the type of the original value and often times it won’t be. The combineByKey function takes 3 functions as arguments:

  1. A function that creates a combiner. In the aggregateByKey function the first argument was simply an initial zero value. In combineByKey we provide a function that will accept our current value as a parameter and return our new value that will be merged with additional values.

  2. The second function is a merging function that takes a value and merges/combines it into the previously collected values.

  3. The third function combines the merged values together. Basically this function takes the new values produced at the partition level and combines them until we end up with one singular value.

In other words, to understand combineByKey, it’s useful to think of how it handles each element it processes. As combineByKey goes through the elements in a partition, each element either has a key it hasn’t seen before or has the same key as a previous element.

If it’s a new element, combineByKey uses a function we provide, called createCombiner(), to create the initial value for the accumulator on that key. It’s important to note that this happens the first time a key is found in each partition, rather than only the first time the key is found in the RDD.

If it is a value we have seen before while processing that partition, it will instead use the provided function, mergeValue(), with the current value for the accumulator for that key and the new value.

Since each partition is processed independently, we can have multiple accumulators for the same key. When we are merging the results from each partition, if two or more partitions have an accumulator for the same key we merge the accumulators using the user-supplied mergeCombiners() function.

References:

  • Learning Spark - Chapter 4.
  • Using combineByKey in Apache-Spark blog entry.
like image 108
eliasah Avatar answered Oct 08 '22 13:10

eliasah


Here is an example of combineByKey. The objective is to find a per key average of the input data.

scala> val kvData = Array(("a",1),("b",2),("a",3),("c",9),("b",6))
kvData: Array[(String, Int)] = Array((a,1), (b,2), (a,3), (c,9), (b,6))

scala> val kvDataDist = sc.parallelize(kvData,5)
kvDataDist: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val keyAverages = kvDataDist.combineByKey(x=>(x,1),(a: (Int,Int),x: Int)=>(a._1+x,a._2+1),(b: (Int,Int),c: (Int,Int))=>(b._1+c._1,b._2+c._2))
keyAverages: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[4] at combineByKey at <console>:25

scala> keyAverages.collect
res0: Array[(String, (Int, Int))] = Array((c,(9,1)), (a,(4,2)), (b,(8,2)))

scala> val keyAveragesFinal = keyAverages.map(x => (x._1,x._2._1/x._2._2))
keyAveragesFinal: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:25

scala> keyAveragesFinal.collect
res1: Array[(String, Int)] = Array((c,9), (a,2), (b,4))

combineByKey takes 3 functions as arguments:

  1. Function 1 = createCombiner : Called once per key 'k', in each partition

    • Input: Value associated with a key 'k'
    • Output: Any desired output type 'O' based on the program logic. This output type will be automatically used further. In this example the output type chosen is (Int,Int). The first element in the Pair sums the values, the second element keeps track of the number of elements that make up the sum.
  2. Function 2 = mergeValue : Called as many times as the occurrence of key 'k' within the partition - 1

    • Input: (Output of createCombiner: O, Subsequent value associated with the key 'k' in this partition)
    • Output: (Output: O)
  3. Function 3 = mergeCombiners : Called as many times as the partitions in which the key exists

    • Input: (Output of mergeValue from Partition X: O, Output of mergeValue from partition Y: O)
    • Output: (Output: O )
like image 34
Avinash Ganta Avatar answered Oct 08 '22 12:10

Avinash Ganta