I have two RDDs
in PySpark:
RDD1:
[(u'2013-01-31 00:00:00', u'a', u'Pab', u'abc', u'd'),(u'2013-01-31 00:00:00', u'a', u'ab', u'abc', u'g'),.....]
RDD2:
[(u'41',u'42.0'),(u'24',u'98.0'),....]
Both RDDs
have same number or rows. Now what I want to do is take all the columns in each row from RDD1(converted from unicode
to normal string
) and the 2nd column from each row in RDD2 (converted from unicode string
to float
) and form a new RDD with that. So the new RDD will look like this:
RDD3:
[('2013-01-31 00:00:00', 'a', 'Pab', 'abc', 'd',42.0),('2013-01-31 00:00:00', 'a', 'ab', u'abc', 'g',98.0),.....]
Once that is done then I want to do aggregation
of last value in each row(the float value) in this new RDD3
by the date
value in 1st column. That mans all the rows where date
is 2013-01-31 00:00:00
, their last numeric values should be added.
How can I do this in PySpark?
You need to zipWithIndex your RDDs
, this method creates a tuple with your data and with another value that represents the index of that entry, therefore you can join both RDDs
by index
.
Your approach should be similar to (I bet there are more efficient ways):
rdd1 = sc.parallelize([u"A", u"B", u"C", u"A", u"Z"])
rdd2 = sc.parallelize(xrange(5))
zdd1 = rdd1.zipWithIndex().map(lambda (v, k): (k, v))
zdd2 = rdd2.zipWithIndex().map(lambda (v, k): (k, v))
print zdd1.join(zdd2).collect()
The output will be:
[(0, (u'A', 0)), (4, (u'Z', 4)), (1, (u'B', 1)), (2, (u'C', 2)), (3, (u'A', 3))]
, after this only a map
is required to recompose the data. E.g. below:
combinedRDD = zdd1.join(zdd2).map(lambda (k, v): v)
print combinedRDD.collect()
# You can use the .zip method combinedRDD = rdd1.zip(rdd2)
The output will be:
[(u'A', 0), (u'Z', 4), (u'B', 1), (u'C', 2), (u'A', 3)]
About the data type conversion, I have had that problem before and to solve this I use this snippet.
import unicodedata
convert = lambda (v1, v2): (unicodedata.normalize('NFKD', v1)
.encode('ascii','ignore'), v2)
combinedRDD = combinedRDD.map(convert)
print combinedRDD.collect()
Will output: [('A', 0), ('Z', 4), ('B', 1), ('C', 2), ('A', 3)]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With