Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's the difference between join and cogroup in Apache Spark

What's the difference between join and cogroup in Apache Spark? What's the use case for each method?

like image 602
miaoiao Avatar asked May 14 '17 05:05

miaoiao


People also ask

What is Spark Cogroup?

Spark cogroup Function In Spark, the cogroup function performs on different datasets, let's say, (K, V) and (K, W) and returns a dataset of (K, (Iterable , Iterable )) tuples. This operation is also known as groupWith.

What are joins in Spark?

The Spark SQL supports several types of joins such as inner join, cross join, left outer join, right outer join, full outer join, left semi-join, left anti join. Joins scenarios are implemented in Spark SQL based upon the business use case. Some of the joins require high resource and computation efficiency.

How do I combine two large datasets in Spark?

Sticking to use cases mentioned above, Spark will perform (or be forced by us to perform) joins in two different ways: either using Sort Merge Joins if we are joining two big tables, or Broadcast Joins if at least one of the datasets involved is small enough to be stored in the memory of the single all executors.

Is join an action in Spark?

Join is supposed to be a transformation, not an action.


1 Answers

Let me help you to clarify them, both are common to use and important!

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] 

This is prototype of join, please carefully look at it. For example,

val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) val rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)   scala> rdd1.join(rdd2).collect res0: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c))) 

All keys that will appear in the final result is common to rdd1 and rdd2. This is similar to relation database operation INNER JOIN.

But cogroup is different,

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] 

as one key at least appear in either of the two rdds, it will appear in the final result, let me clarify it:

val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) val rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)  scala> var rdd3 = rdd1.cogroup(rdd2).collect res0: Array[(String, (Iterable[String], Iterable[String]))] = Array( (B,(CompactBuffer(2),CompactBuffer())),  (D,(CompactBuffer(),CompactBuffer(d))),  (A,(CompactBuffer(1),CompactBuffer(a))),  (C,(CompactBuffer(3),CompactBuffer(c))) ) 

This is very similar to relation database operation FULL OUTER JOIN, but instead of flattening the result per line per record, it will give you the iterable interface to you, the following operation is up to you as convenient!

Good Luck!

Spark docs is: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

like image 187
ashburshui Avatar answered Sep 22 '22 03:09

ashburshui