Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark reduceByKey? to add Key/Tuple

I have the following data and what I want to do is

[(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]

Is for each key count the instances of the value (a 1 string character). So I first did a map:

.map(lambda x: (x[0], [x[1], 1]))

Making it now a key/tuple of:

[(13, ['D', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['T', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['T', 1]), (53, ['2', 1]), (54, ['0', 1]), (13, ['A', 1]), (14, ['T', 1]), (32, ['6', 1]), (45, ['A', 1]), (47, ['2', 1]), (48, ['0', 1]), (49, ['2', 1]), (50, ['0', 1]), (51, ['X', 1])]

I just cant for the last part figure out how to for each key count the instances of that letter. For instance Key 13 will have 1 D and 1 A. While 14 will have 2 T's, etc.

like image 698
theMadKing Avatar asked Apr 23 '15 20:04

theMadKing


2 Answers

Instead of:

.map(lambda x: (x[0], [x[1], 1]))

We could do this:

.map(lambda x: ((x[0], x[1]), 1))

And in the last step, we could use reduceByKey and add. Note that add comes from the operator package.

Putting it together:

from operator import add
rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(add).collect()
like image 104
srctaha Avatar answered Sep 20 '22 02:09

srctaha


I'm much more familiar with Spark in Scala, so there may be better ways than Counter to count the characters in the iterable produced by groupByKey, but here's an option:

from collections import Counter

rdd = sc.parallelize([(13, 'D'), (14, 'T'), (32, '6'), (45, 'T'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'T'), (53, '2'), (54, '0'), (13, 'A'), (14, 'T'), (32, '6'), (45, 'A'), (47, '2'), (48, '0'), (49, '2'), (50, '0'), (51, 'X')]) 
rdd.groupByKey().mapValues(Counter).collect()

[(48, Counter({'0': 2})),
 (32, Counter({'6': 2})),
 (49, Counter({'2': 2})),
 (50, Counter({'0': 2})),
 (51, Counter({'X': 1, 'T': 1})),
 (53, Counter({'2': 1})),
 (13, Counter({'A': 1, 'D': 1})),
 (45, Counter({'A': 1, 'T': 1})),
 (14, Counter({'T': 2})),
 (54, Counter({'0': 1})),
 (47, Counter({'2': 2}))]
like image 36
mattsilver Avatar answered Sep 21 '22 02:09

mattsilver