I knew how a normal aggregate works in scala and its use over fold. Tried a lot to know how the below code works, but couldn't. Could someone help me in explaining how it works and gives me a output of (10,4)
val input=List(1,2,3,4)
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
Could someone help me in explaining how it works and gives me a output of (10,4)
When using aggregate, you provide three parameters:
a function that given a partition, will accumulate the result within it
a function that will combine two partitions
So in your case, the initial value for a partition is the tuple (0, 0).
Then the accumulator function you defined will sum the current element you're traversing with the first element of the tuple and increment the second element of the tuple by one. In fact, it will compute the sum of the elements in a partition and its number of elements.
The combiner function combined two tuples. As you defined it, it will sum the sums and count the number of elements of 2 partitions. It's not used in your case because you traverse the pipeline sequentially. You could call .par on the List so that you get a parallel implementation to see the combiner in action (note that it has to be an associative function).
Thus you get (10, 4) because 1+2+3+4=10 and there was 4 elements in the list (you did 4 additions).
You could add a print statement in the accumulator function (running on a sequential input), to see how it behaves:
Acc: (0,0) - value:1
Acc: (1,1) - value:2
Acc: (3,2) - value:3
Acc: (6,3) - value:4
I knew how a normal aggregate works in scala and its use over fold.
For a sequential input, aggregate is a foldLeft:
def aggregate[B](z: =>B)(seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop)
For a parallel input, the list is split into chunks so that multiple threads can work separately. The accumulator function is run on each chunk, using the initial value. When two threads need to merge their results, the combine function is used:
def aggregate[S](z: =>S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
tasksupport.executeAndWaitResult(new Aggregate(() => z, seqop, combop, splitter))
}
This is the principle of the fork-join model but it requires that your task can be parallelizable well. It's the case here, because a thread does not need to know the result of another thread to do its job.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With