Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add columns of 2 RDDs to from a single RDD and then do aggregation of rows based on date data in PySpark

I have two RDDs in PySpark:

RDD1:

[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....]

RDD2:

[(u'41',u'42.0'),(u'24',u'98.0'),....]

Both RDDs have same number or rows. Now what I want to do is take all the columns in each row from RDD1(converted from unicode to normal string) and the 2nd column from each row in RDD2 (converted from unicode string to float ) and form a new RDD with that. So the new RDD will look like this:

RDD3:

[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....]

Once that is done then I want to do aggregation of last value in each row(the float value) in this new RDD3 by the date value in 1st column. That mans all the rows where date is 2013-01-31 00:00:00, their last numeric values should be added.

How can I do this in PySpark?

like image 247
Jason Donnald Avatar asked Oct 30 '22 14:10

Jason Donnald


1 Answers

You need to zipWithIndex your RDDs, this method creates a tuple with your data and with another value that represents the index of that entry, therefore you can join both RDDs by index.

Your approach should be similar to (I bet there are more efficient ways):

rdd1 = sc.parallelize([u"A", u"B", u"C", u"A", u"Z"])
rdd2 = sc.parallelize(xrange(5))

zdd1 = rdd1.zipWithIndex().map(lambda (v, k): (k, v))
zdd2 = rdd2.zipWithIndex().map(lambda (v, k): (k, v))

print zdd1.join(zdd2).collect()

The output will be: [(0, (u'A', 0)), (4, (u'Z', 4)), (1, (u'B', 1)), (2, (u'C', 2)), (3, (u'A', 3))], after this only a map is required to recompose the data. E.g. below:

combinedRDD = zdd1.join(zdd2).map(lambda (k, v): v)
print combinedRDD.collect()

# You can use the .zip method combinedRDD = rdd1.zip(rdd2)

The output will be: [(u'A', 0), (u'Z', 4), (u'B', 1), (u'C', 2), (u'A', 3)]

About the data type conversion, I have had that problem before and to solve this I use this snippet.

import unicodedata

convert = lambda (v1, v2): (unicodedata.normalize('NFKD', v1)
                                       .encode('ascii','ignore'), v2)

combinedRDD = combinedRDD.map(convert)
print combinedRDD.collect()

Will output: [('A', 0), ('Z', 4), ('B', 1), ('C', 2), ('A', 3)]

like image 195
Alberto Bonsanto Avatar answered Nov 09 '22 09:11

Alberto Bonsanto