I have two rdd's, and I want to do some computation on RDD2 items for each item of rdd1. So, I am passing RDD2 in a user defined function as below but I am getting error like rdd1 cannot be passed in another rdd
. Can I know how to achieve this if I want to perform operations on two rdd's?
For example:
RDD1.map(line =>function(line,RDD2))
Nesting RDDs is not supported by Spark, as the error says. Usually you have to go around it by redesigning your algorithm.
How to do it depends on an actual use case, what does exactly happen in function
and what is it's output.
Sometimes a RDD1.cartesian(RDD2)
, doing operations per tuple and then reducing by key will work. Sometimes, if you have (K,V)
type a join between both RDD will work.
If RDD2 is small you can always collect it in the driver, make it a broadcast variable and use that variable in function
instead of RDD2
.
@Edit:
For example's sake let's assume your RDDs hold strings and function
would count how many times a given record from RDD
occurs in RDD2
:
def function(line: String, rdd: RDD[String]): (String, Int) = {
(line, rdd.filter(_ == line).count)
}
This would return an RDD[(String, Int)]
.
Idea1
You can try using a cartesian product using RDD's cartesian
method.
val cartesianProduct = RDD1.cartesian(RDD2) // creates RDD[(String, String)]
.map( (r1,r2) => (r1, function2) ) // creates RDD[(String, Int)]
.reduceByKey( (c1,c2) => c1 + c2 ) // final RDD[(String, Int)]
Here function2
takes r1
and r2
(which are strings) and returns 1
if they are equal and 0
if not. The final map would result in an RDD
which would have tuples where the key would be a record from r1
and value would be the total count.
Problem1: This would NOT work if you have duplicate strings in RDD1
, though. You'd have to think about it. If RDD1
records have some unique ids that would be perfect.
Problem2: this does create A LOT of pairs (for 1mln records in both RDD it would create around 500bln pairs), would be slow and most probably result in a lot of shuffling.
Idea2
I didn't understand your comment regarding RDD2's size lacs
so this might or might not work:
val rdd2array = sc.broadcast(RDD2.collect())
val result = RDD1.map(line => function(line, rdd2array))
Problem: this might blow up your memory. collect()
is called on the driver
and all
records from rdd2
will be loaded into memory on the driver node.
Idea3
Depending on the use case there are other ways to overcome this, for instance brute force algorithm for Similarity Search is similar (pun not intended) to your use case. One of alternative solutions for this is Locality Sensitive Hashing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With