I am an Apache Spark learner and have come across a RDD
action aggregate
which I have no clue of how it functions. Can some one spell out and explain in detail step by step how did we arrive at the below result for the code here
RDD input = {1,2,3,3}
RDD Aggregate function :
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
output : {9,4}
Thanks
If you are not sure what is going on it is best to follow the types. Omitting implicit ClassTag
for brevity we start with something like this
abstract class RDD[T] extends Serializable with Logging
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
If you ignore all the additional parameters you'll see that aggregate
is a function which maps from RDD[T]
to U
. It means that the type of the values in the input RDD
doesn't have to be the same as the type of the output value. So it is clearly different than for example reduce
:
def reduce(func: (T, T) ⇒ T): T
or fold
:
def fold(zeroValue: T)(op: (T, T) => T): T
The same as fold
, aggregate
requires a zeroValue
. How to choose it? It should be an identity (neutral) element with respect to combOp
.
You also have to provide two functions:
seqOp
which maps from (U, T)
to U
combOp
which maps from (U, U)
to U
Just based on this signatures you should already see that only seqOp
may access the raw data. It takes some value of type U
another one of type T
and returns a value of type U
. In your case it is a function with a following signature
((Int, Int), Int) => (Int, Int)
At this point you probably suspect it is used for some kind of fold-like operation.
The second function takes two arguments of type U
and returns a value of type U
. As stated before it should be clear that it doesn't touch the original data and can operate only on the values already processed by the seqOp
. In your case this function has a signature as follows:
((Int, Int), (Int, Int)) => (Int, Int)
So how can we get all of that together?
First each partition is aggregated using standard Iterator.aggregate
with zeroValue
, seqOp
and combOp
passed as z
, seqop
and combop
respectivelly. Since InterruptibleIterator
used internally doesn't override aggregate
it should be executed as a simple foldLeft(zeroValue)(seqOp)
Next partial results collected from each partition are aggregated using combOp
Lets assume that input RDD has three partitions with following distribution of values:
Iterator(1, 2)
Iterator(2, 3)
Iterator()
You can expect that execution, ignoring absolute order, will be equivalent to something like this:
val seqOp = (x: (Int, Int), y: Int) => (x._1 + y, x._2 + 1)
val combOp = (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)
Seq(Iterator(1, 2), Iterator(3, 3), Iterator())
.map(_.foldLeft((0, 0))(seqOp))
.reduce(combOp)
foldLeft
for a single partition can look like this:
Iterator(1, 2).foldLeft((0, 0))(seqOp)
Iterator(2).foldLeft((1, 1))(seqOp)
(3, 2)
and over all partitions
Seq((3,2), (6,2), (0,0))
which combined will give you observed result:
(3 + 6 + 0, 2 + 2 + 0)
(9, 4)
In general this is a common pattern you will find all over Spark where you pass neutral value, a function used to process values per partition and a function used to merge partial aggregates from different partitions. Some other examples include:
aggregateByKey
Aggregators
on Spark Datasets
.Here is my understanding for your reference:
Imagine you have two nodes, one take the input of the first two list elements {1,2}, and another takes {3, 3}. (The partition here is only for convenient)
At the first node: "(x, y) => (x._1 + y, x._2 + 1)" , the first x is (0,0) as given, and y is your first element 1, and you will have output (0+1, 0+1), then comes your second element y=2, and output (1 + 2, 1 + 1), which is (3, 2)
At the second node, same procedure happens in parallel, and you'll have (6, 2).
"(x, y) => (x._1 + y._1, x._2 + y._2)", tells you to merge two nodes, and you'll get (9,4)
one thing worth noticing is (0,0) is actually added to the result length(rdd)+1 times.
"scala> rdd.aggregate((1,1)) ((x, y) =>(x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) res1: (Int, Int) = (14,9)"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With