I have two RDD's that I want to join and they look like this:
val rdd1:RDD[(T,U)] val rdd2:RDD[((T,W), V)]
It happens to be the case that the key values of rdd1
are unique and also that the tuple-key values of rdd2
are unique. I'd like to join the two data sets so that I get the following rdd:
val rdd_joined:RDD[((T,W), (U,V))]
What's the most efficient way to achieve this? Here are a few ideas I've thought of.
Option 1:
val m = rdd1.collectAsMap val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
Option 2:
val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
Option 1 will collect all of the data to master, right? So that doesn't seem like a good option if rdd1 is large (it's relatively large in my case, although an order of magnitude smaller than rdd2). Option 2 does an ugly distinct and cartesian product, which also seems very inefficient. Another possibility that crossed my mind (but haven't tried yet) is to do option 1 and broadcast the map, although it would be better to broadcast in a "smart" way so that the keys of the map are co-located with the keys of rdd2
.
Has anyone come across this sort of situation before? I'd be happy to have your thoughts.
Thanks!
cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.
The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.
It is joining two datasets. When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
One option is to perform a broadcast join by collecting rdd1
to the driver and broadcasting it to all mappers; done correctly, this will let us avoid an expensive shuffle of the large rdd2
RDD:
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))) val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333))) val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap()) val joined = rdd2.mapPartitions({ iter => val m = rdd1Broadcast.value for { ((t, w), u) <- iter if m.contains(t) } yield ((t, w), (u, m.get(t).get)) }, preservesPartitioning = true)
The preservesPartitioning = true
tells Spark that this map function doesn't modify the keys of rdd2
; this will allow Spark to avoid re-partitioning rdd2
for any subsequent operations that join based on the (t, w)
key.
This broadcast could be inefficient since it involves a communications bottleneck at the driver. In principle, it's possible to broadcast one RDD to another without involving the driver; I have a prototype of this that I'd like to generalize and add to Spark.
Another option is to re-map the keys of rdd2
and use the Spark join
method; this will involve a full shuffle of rdd2
(and possibly rdd1
):
rdd1.join(rdd2.map { case ((t, w), u) => (t, (w, u)) }).map { case (t, (v, (w, u))) => ((t, w), (u, v)) }.collect()
On my sample input, both of these methods produce the same result:
res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))
A third option would be to restructure rdd2
so that t
is its key, then perform the above join.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With