Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Calculating the averages for each KEY in a Pairwise (K,V) RDD in Spark with Python

I want to share this particular Apache Spark with Python solution because documentation for it is quite poor.

I wanted to calculate the average value of K/V pairs (stored in a Pairwise RDD), by KEY. Here is what the sample data looks like:

>>> rdd1.take(10) # Show a small sample. [(u'2013-10-09', 7.60117302052786), (u'2013-10-10', 9.322709163346612), (u'2013-10-10', 28.264462809917358), (u'2013-10-07', 9.664429530201343), (u'2013-10-07', 12.461538461538463), (u'2013-10-09', 20.76923076923077), (u'2013-10-08', 11.842105263157894), (u'2013-10-13', 32.32514177693762), (u'2013-10-13', 26.249999999999996), (u'2013-10-13', 10.693069306930692)] 

Now the following code sequence is a less than optimal way to do it, but it does work. It is what I was doing before I figured out a better solution. It's not terrible but -- as you'll see in the answer section -- there is a more concise, efficient way.

>>> import operator >>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: {u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...} >>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs). >>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT) >>> print(rdd1.collect())   [(u'2013-10-09', 11.235365503035176),    (u'2013-10-07', 23.39500642456595),    ... snip ...   ] 
like image 915
NYCeyes Avatar asked Apr 28 '15 21:04

NYCeyes


People also ask

What is the difference between RDD and pair RDD?

Spark provides special operations on RDDs containing key/value pairs. These RDDs are called pair RDDs. Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network.

How many RDDs can Cogroup () can work at once?

cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.

How do you use reduceByKey in Pyspark?

reduceByKey. Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.


2 Answers

Now a much better way to do this is to use the rdd.aggregateByKey() method. Because this method is so poorly documented in the Apache Spark with Python documentation -- and is why I wrote this Q&A -- until recently I had been using the above code sequence. But again, it's less efficient, so avoid doing it that way unless necessary.

Here's how to do the same using the rdd.aggregateByKey() method (recommended):

By KEY, simultaneously calculate the SUM (the numerator for the average that we want to compute), and COUNT (the denominator for the average that we want to compute):

>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function. >>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b,    a[1] + 1),                                        lambda a,b: (a[0] + b[0], a[1] + b[1])) 

Where the following is true about the meaning of each a and b pair above (so you can visualize what's happening):

   First lambda expression for Within-Partition Reduction Step::    a: is a TUPLE that holds: (runningSum, runningCount).    b: is a SCALAR that holds the next Value     Second lambda expression for Cross-Partition Reduction Step::    a: is a TUPLE that holds: (runningSum, runningCount).    b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount). 

Finally, calculate the average for each KEY, and collect results.

>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect() >>> print(finalResult)       [(u'2013-09-09', 11.235365503035176),        (u'2013-09-01', 23.39500642456595),        (u'2013-09-03', 13.53240060820617),        (u'2013-09-05', 13.141148418977687),    ... snip ...   ] 

I hope this question and answer with aggregateByKey() will help.

like image 175
NYCeyes Avatar answered Oct 09 '22 01:10

NYCeyes


To my mind a more readable equivalent to an aggregateByKey with two lambdas is:

rdd1 = rdd1 \     .mapValues(lambda v: (v, 1)) \     .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) 

In this way the whole average calculation would be:

avg_by_key = rdd1 \     .mapValues(lambda v: (v, 1)) \     .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \     .mapValues(lambda v: v[0]/v[1]) \     .collectAsMap() 
like image 22
pat Avatar answered Oct 09 '22 01:10

pat