I am looking for some better explanation of the aggregate functionality that is available via spark in python.
The example I have is as follows (using pyspark from Spark 1.2.0 version)
sc.parallelize([1,2,3,4]).aggregate( (0, 0), (lambda acc, value: (acc[0] + value, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Output:
(10, 4)
I get the expected result (10,4)
which is sum of 1+2+3+4
and 4 elements. If I change the initial value passed to the aggregate function to (1,0)
from (0,0)
I get the following result
sc.parallelize([1,2,3,4]).aggregate( (1, 0), (lambda acc, value: (acc[0] + value, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
Output:
(19, 4)
The value increases by 9. If I change it to (2,0)
, the value goes to (28,4)
and so on.
Can someone explain to me how this value is calculated? I expected the value to go up by 1 not by 9, expected to see (11,4)
instead I am seeing (19,4)
.
Scala | aggregate() Function. The aggregate() function is utilized to combine outcomes. Initially, a sequence operation is applied as that is the first parameter of aggregate() function and then its followed by a combine operation which is utilized to combine the solutions generated by the sequence operation performed.
The aggregate() method allows you to apply a function or a list of function names to be executed along one of the axis of the DataFrame, default 0, which is the index (row) axis. Note: the agg() method is an alias of the aggregate() method.
Conclusions. Type-safe aggregations allow Data Scientists to specify how transformations are being undertaken, how data is combined within and across nodes, and what each step should entail in terms of expected input and output type. Use Spark's Aggregator class to perform type-safe transformations.
I wasn't fully convinced from the accepted answer, and JohnKnight's answer helped, so here's my point of view:
First, let's explain aggregate() in my own words:
Prototype:
aggregate(zeroValue, seqOp, combOp)
Description:
aggregate()
lets you take an RDD and generate a single value that is of a different type than what was stored in the original RDD.
Parameters:
zeroValue
: The initialization value, for your result, in the desired format.seqOp
: The operation you want to apply to RDD records. Runs once for every record in a partition.combOp
: Defines how the resulted objects (one for every partition), gets combined.Example:
Compute the sum of a list and the length of that list. Return the result in a pair of
(sum, length)
.
In a Spark shell, I first created a list with 4 elements, with 2 partitions:
listRDD = sc.parallelize([1,2,3,4], 2)
then I defined my seqOp:
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
and my combOp:
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
and then I aggregated:
listRDD.aggregate( (0, 0), seqOp, combOp) Out[8]: (10, 4)
As you can see, I gave descriptive names to my variables, but let me explain it further:
The first partition has the sublist [1, 2]. We will apply the seqOp to each element of that list and this will produce a local result, a pair of (sum, length)
, that will reflect the result locally, only in that first partition.
So, let's start: local_result
gets initialized to the zeroValue
parameter we provided the aggregate()
with, i.e. (0, 0) and list_element
is the first element of the list, i.e. 1. As a result this is what happens:
0 + 1 = 1 0 + 1 = 1
Now, the local result is (1, 1), that means, that so far, for the 1st partition, after processing only the first element, the sum is 1 and the length 1. Notice, that local_result
gets updated from (0, 0), to (1, 1).
1 + 2 = 3 1 + 1 = 2
and now the local result is (3, 2), which will be the final result from the 1st partition, since they are no other elements in the sublist of the 1st partition.
Doing the same for 2nd partition, we get (7, 2).
Now we apply the combOp to each local result, so that we can form, the final, global result, like this: (3,2) + (7,2) = (10, 4)
Example described in 'figure':
(0, 0) <-- zeroValue [1, 2] [3, 4] 0 + 1 = 1 0 + 3 = 3 0 + 1 = 1 0 + 1 = 1 1 + 2 = 3 3 + 4 = 7 1 + 1 = 2 1 + 1 = 2 | | v v (3, 2) (7, 2) \ / \ / \ / \ / \ / \ / ------------ | combOp | ------------ | v (10, 4)
Inspired by this great example.
So now if the zeroValue
is not (0, 0), but (1, 0), one would expect to get (8 + 4, 2 + 2) = (12, 4), which doesn't explain what you experience. Even if we alter the number of partitions of my example, I won't be able to get that again.
The key here is JohnKnight's answer, which state that the zeroValue
is not only analogous to the number of partitions, but may be applied more times than you expect.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With