Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass one RDD in another RDD through .map

I have two rdd's, and I want to do some computation on RDD2 items for each item of rdd1. So, I am passing RDD2 in a user defined function as below but I am getting error like rdd1 cannot be passed in another rdd. Can I know how to achieve this if I want to perform operations on two rdd's?

For example:

RDD1.map(line =>function(line,RDD2))

like image 297
Raghavendra Kulkarni Avatar asked Sep 26 '22 16:09

Raghavendra Kulkarni


1 Answers

Nesting RDDs is not supported by Spark, as the error says. Usually you have to go around it by redesigning your algorithm.

How to do it depends on an actual use case, what does exactly happen in function and what is it's output.

Sometimes a RDD1.cartesian(RDD2), doing operations per tuple and then reducing by key will work. Sometimes, if you have (K,V) type a join between both RDD will work.

If RDD2 is small you can always collect it in the driver, make it a broadcast variable and use that variable in function instead of RDD2.

@Edit:

For example's sake let's assume your RDDs hold strings and function would count how many times a given record from RDD occurs in RDD2:

def function(line: String, rdd: RDD[String]): (String, Int) = {
   (line, rdd.filter(_ == line).count)
} 

This would return an RDD[(String, Int)].

Idea1

You can try using a cartesian product using RDD's cartesian method.

val cartesianProduct = RDD1.cartesian(RDD2) // creates RDD[(String, String)]
                           .map( (r1,r2) => (r1, function2) ) // creates RDD[(String, Int)]
                           .reduceByKey( (c1,c2) => c1 + c2 ) // final RDD[(String, Int)]

Here function2 takes r1 and r2 (which are strings) and returns 1 if they are equal and 0 if not. The final map would result in an RDD which would have tuples where the key would be a record from r1 and value would be the total count.

Problem1: This would NOT work if you have duplicate strings in RDD1, though. You'd have to think about it. If RDD1 records have some unique ids that would be perfect.

Problem2: this does create A LOT of pairs (for 1mln records in both RDD it would create around 500bln pairs), would be slow and most probably result in a lot of shuffling.

Idea2

I didn't understand your comment regarding RDD2's size lacs so this might or might not work:

val rdd2array = sc.broadcast(RDD2.collect())
val result = RDD1.map(line => function(line, rdd2array))

Problem: this might blow up your memory. collect() is called on the driver and all records from rdd2 will be loaded into memory on the driver node.

Idea3

Depending on the use case there are other ways to overcome this, for instance brute force algorithm for Similarity Search is similar (pun not intended) to your use case. One of alternative solutions for this is Locality Sensitive Hashing.

like image 138
Mateusz Dymczyk Avatar answered Sep 29 '22 07:09

Mateusz Dymczyk