I am trying to understand how each step in combineByKeys
works.
Can someone please help me understand the same for the below RDD?
val rdd = sc.parallelize(List(
("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),("B", 4),
("B", 10), ("B", 11), ("B", 20), ("B", 25),("C", 32), ("C", 91),
("C", 122), ("C", 3), ("C", 55)), 2)
rdd.combineByKey(
(x:Int) => (x, 1),
(acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
First, let's break the process down:
First, createCombiner
creates the initial value (combiner) for a key's first encounter on a partition if one is not found -->
(firstValueEncountered, 1)
. So, this is merely initializing a tuple with the first value and a key counter of 1.
Then, mergeValue
is triggered only if a combiner (tuple in our case) has already been created for the found key on this partition -->
(existingTuple._1 + subSequentValue, existingTuple._2 + 1)
. This adds the existing tuple's value (in the first slot) with the newly encountered value and takes the existing tuple's counter (in the second slot) and increments it. So, we are
Then, mergeCombiner
takes the combiners (tuples) created on each partition and merges them together -->
(tupleFromPartition._1 + tupleFromPartition2._1, tupleFromPartition1._2 + tupleFromPartition2._2)
. This is merely adding the values from each tuple together and the counters together into one tuple.
Then, let's break up a subset of your data into partitions and see it in action:
("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)
A=3 --> createCombiner(3) ==> accum[A] = (3, 1)
A=9 --> mergeValue(accum[A], 9) ==> accum[A] = (3 + 9, 1 + 1)
B=11 --> createCombiner(11) ==> accum[B] = (11, 1)
A=12 --> createCombiner(12) ==> accum[A] = (12, 1)
B=4 --> createCombiner(4) ==> accum[B] = (4, 1)
B=10 --> mergeValue(accum[B], 10) ==> accum[B] = (4 + 10, 1 + 1)
A ==> mergeCombiner((12, 2), (12, 1)) ==> (12 + 12, 2 + 1)
B ==> mergeCombiner((11, 1), (14, 2)) ==> (11 + 14, 1 + 2)
So, you should get back an array something like this:
Array((A, (24, 3)), (B, (25, 3)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With