Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to use spark intersection() by key or filter() with two RDD?

I want to use intersection() by key or filter() in spark.

But I really don't know how to use intersection() by key.

So I tried to use filter(), but it's not worked.

example - here is two RDD:

data1 //RDD[(String, Int)] = Array(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))
data2 //RDD[(String, Int)] = Array(("a", 3), ("b", 5))

val data3 = data2.map{_._1}

data1.filter{_._1 == data3}.collect //Array[(String, Int] = Array()

I want to get a (key, value) pair with the same key as data1 based on the key that data2 has.

Array(("a", 1), ("a", 2), ("b", 2), ("b", 3)) is the result I want.

Is there a method to solve this problem using intersection() by key or filter()?

like image 999
S.Kang Avatar asked Mar 09 '23 20:03

S.Kang


1 Answers

For your problem, I think cogroup() is better suited. The intersection() method will consider both keys and values in your data, and will result in an empty rdd.

The function cogroup() groups the values of both rdd's by key and gives us (key, vals1, vals2), where vals1 and vals2 contain the values of data1 and data2 respectively, for each key. Note that if a certain key is not shared in both datasets, one of vals1 or vals2 will be returned as an empty Seq, hence we'll first have to filter out these tuples to arrive at the intersection of the two rdd's.

Next, we'll grab vals1 - which contains the values from data1 for the common keys - and convert it to format (key, Array). Lastly we use flatMapValues() to unpack the result into the format of (key, value).

val result = (data1.cogroup(data2)
  .filter{case (k, (vals1, vals2)) => vals1.nonEmpty && vals2.nonEmpty }
  .map{case (k, (vals1, vals2)) => (k, vals1.toArray)}
  .flatMapValues(identity[Array[Int]]))

result.collect()
// Array[(String, Int)] = Array((a,1), (a,2), (b,2), (b,3))
like image 143
mtoto Avatar answered Apr 06 '23 07:04

mtoto