Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark join rdds by a specific key

Tags:

join

rdd

pyspark

I have two rdds that I need to join them together. They look like the followings:

RDD1

[(u'2', u'100', 2),
 (u'1', u'300', 1),
 (u'1', u'200', 1)]

RDD2

[(u'1', u'2'), (u'1', u'3')]

My desired output is:

[(u'1', u'2', u'100', 2)]

So I would like to select those from RDD2 that have the same second value of RDD1. I have tried join and also cartesian and none is working and not getting even close to what I am looking for. I am new to Spark and would appreciate any help from you guys.

Thanks

like image 889
dagg3r Avatar asked Mar 15 '17 22:03

dagg3r


People also ask

Can we perform join on RDD?

join() operation on RDDs uses a (Key, Value) paradigm to find the intersection between sets. Therefore, prior to performing the join, we should format our datasets so that they conform to the (Key, Value) format required by Spark (conforms to the MapReduce paradigm).

How do you sort RDD by value?

sortBy() is used to sort the data by value efficiently in pyspark. It is a method available in rdd. It uses a lambda expression to sort the data based on columns.


1 Answers

Dataframe If you allow using Spark Dataframe in the solution. You can turn given RDD to dataframes and join the corresponding column together.

df1 = spark.createDataFrame(rdd1, schema=['a', 'b', 'c'])
df2 = spark.createDataFrame(rdd2, schema=['d', 'a'])
rdd_join = df1.join(df2, on='a')
out = rdd_join.rdd.collect()

RDD just zip the key that you want to join to the first element and simply use join to do the joining

rdd1_zip = rdd1.map(lambda x: (x[0], (x[1], x[2])))
rdd2_zip = rdd2.map(lambda x: (x[1], x[0]))
rdd_join = rdd1_zip.join(rdd2_zip)
rdd_out = rdd_join.map(lambda x: (x[0], x[1][0][0], x[1][0][1], x[1][1])).collect() # flatten the rdd
print(rdd_out)
like image 141
titipata Avatar answered Oct 13 '22 02:10

titipata