I want to use intersection()
by key or filter()
in spark.
But I really don't know how to use intersection()
by key.
So I tried to use filter()
, but it's not worked.
example - here is two RDD:
data1 //RDD[(String, Int)] = Array(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))
data2 //RDD[(String, Int)] = Array(("a", 3), ("b", 5))
val data3 = data2.map{_._1}
data1.filter{_._1 == data3}.collect //Array[(String, Int] = Array()
I want to get a (key, value) pair with the same key as data1
based on the key that data2
has.
Array(("a", 1), ("a", 2), ("b", 2), ("b", 3))
is the result I want.
Is there a method to solve this problem using intersection()
by key or filter()
?
For your problem, I think cogroup()
is better suited. The intersection()
method will consider both keys and values in your data, and will result in an empty rdd
.
The function cogroup()
groups the values of both rdd
's by key and gives us (key, vals1, vals2)
, where vals1
and vals2
contain the values of data1
and data2
respectively, for each key. Note that if a certain key is not shared in both datasets, one of vals1
or vals2
will be returned as an empty Seq
, hence we'll first have to filter out these tuples to arrive at the intersection of the two rdd
's.
Next, we'll grab vals1
- which contains the values from data1
for the common keys - and convert it to format (key, Array)
. Lastly we use flatMapValues()
to unpack the result into the format of (key, value)
.
val result = (data1.cogroup(data2)
.filter{case (k, (vals1, vals2)) => vals1.nonEmpty && vals2.nonEmpty }
.map{case (k, (vals1, vals2)) => (k, vals1.toArray)}
.flatMapValues(identity[Array[Int]]))
result.collect()
// Array[(String, Int)] = Array((a,1), (a,2), (b,2), (b,3))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With