Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?

I have two RDD's that I want to join and they look like this:

val rdd1:RDD[(T,U)] val rdd2:RDD[((T,W), V)] 

It happens to be the case that the key values of rdd1 are unique and also that the tuple-key values of rdd2 are unique. I'd like to join the two data sets so that I get the following rdd:

val rdd_joined:RDD[((T,W), (U,V))] 

What's the most efficient way to achieve this? Here are a few ideas I've thought of.

Option 1:

val m = rdd1.collectAsMap val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))}) 

Option 2:

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2) 

Option 1 will collect all of the data to master, right? So that doesn't seem like a good option if rdd1 is large (it's relatively large in my case, although an order of magnitude smaller than rdd2). Option 2 does an ugly distinct and cartesian product, which also seems very inefficient. Another possibility that crossed my mind (but haven't tried yet) is to do option 1 and broadcast the map, although it would be better to broadcast in a "smart" way so that the keys of the map are co-located with the keys of rdd2.

Has anyone come across this sort of situation before? I'd be happy to have your thoughts.


like image 651
RyanH Avatar asked Jul 12 '13 18:07


People also ask

How many RDDs can Cogroup () can work at once?

cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.

Which method should be used when a given RDD is to be divided into number of partitions?

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.

When you call join operation on two pair RDDs eg K V and K w What is the result?

It is joining two datasets. When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.

1 Answers

One option is to perform a broadcast join by collecting rdd1 to the driver and broadcasting it to all mappers; done correctly, this will let us avoid an expensive shuffle of the large rdd2 RDD:

val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))) val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))  val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap()) val joined = rdd2.mapPartitions({ iter =>   val m = rdd1Broadcast.value   for {     ((t, w), u) <- iter     if m.contains(t)   } yield ((t, w), (u, m.get(t).get)) }, preservesPartitioning = true) 

The preservesPartitioning = true tells Spark that this map function doesn't modify the keys of rdd2; this will allow Spark to avoid re-partitioning rdd2 for any subsequent operations that join based on the (t, w) key.

This broadcast could be inefficient since it involves a communications bottleneck at the driver. In principle, it's possible to broadcast one RDD to another without involving the driver; I have a prototype of this that I'd like to generalize and add to Spark.

Another option is to re-map the keys of rdd2 and use the Spark join method; this will involve a full shuffle of rdd2 (and possibly rdd1):

rdd1.join(rdd2.map {   case ((t, w), u) => (t, (w, u)) }).map {   case (t, (v, (w, u))) => ((t, w), (u, v)) }.collect() 

On my sample input, both of these methods produce the same result:

res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C))) 

A third option would be to restructure rdd2 so that t is its key, then perform the above join.

like image 114
Josh Rosen Avatar answered Oct 03 '22 08:10

Josh Rosen