Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to find the intersection of two rdd's by keys in pyspark?

I have two rdds as:

rdd1 = sc.parallelize([("www.page1.html", "word1"), ("www.page2.html", "word1"), 
    ("www.page1.html", "word3")])

rdd2 = sc.parallelize([("www.page1.html", 7.3), ("www.page2.html", 1.25), 
    ("www.page3.html", 5.41)])

intersection_rdd = rdd1.keys().intersection(rdd2.keys())       

// When I do this I am getting just the intersection of keys i.e (www.page1.html, www.page2.html).

But I need keys along with both the values from two rdds. The output should look like this:

[www.page1.html, (word1, word3, 7.3)]

[www.page2.html, (word1, 1.25)]
like image 634
anvesh Avatar asked Mar 14 '23 22:03

anvesh


2 Answers

You can for example cogroup and filter:

## This depends on empty resultiterable.ResultIterable
## evaluating to False

intersection_rdd = rdd1.cogroup(rdd2).filter(lambda x: x[1][0] and x[1][1])
intersection_rdd.map(lambda x: (x[0], (list(x[1][0]), list(x[1][1])))).collect()

## [('www.page1.html', (['word1', 'word3'], [7.3])),
##  ('www.page2.html', (['word1'], [1.25]))]
like image 137
zero323 Avatar answered Mar 16 '23 12:03

zero323


Since you are using set operation only for the keys your output includes only the keys.

rdd1.union(rdd2).groupByKey().mapValues(tuple).collect()

Union                                                           GroupByKey


('www.page1.html', 'word1')                  ('www.page1.html',   ['word1', 'word3', 7.3])
('www.page2.html', 'word1')                  ('www.page2.html',   ['word1', 1.25])
('www.page1.html', 'word3')                  ('www.page3.html',   [5.41])
('www.page1.html', 7.3)
('www.page2.html', 1.25)
('www.page3.html', 5.41)

like image 45
alwaysprep Avatar answered Mar 16 '23 13:03

alwaysprep