Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Join two ordinary RDDs with/without Spark SQL

I need to join two ordinary RDDs on one/more columns. Logically this operation is equivalent to the database join operation of two tables. I wonder if this is possible only through Spark SQL or there are other ways of doing it.

As a concrete example, consider RDD r1 with primary key ITEM_ID:

(ITEM_ID, ITEM_NAME, ITEM_UNIT, COMPANY_ID) 

and RDD r2 with primary key COMPANY_ID:

(COMPANY_ID, COMPANY_NAME, COMPANY_CITY) 

I want to join r1 and r2.

How can this be done?

like image 441
learning_spark Avatar asked Dec 12 '14 05:12

learning_spark


People also ask

Can we perform join on RDD?

Return an RDD containing all pairs of elements with matching keys in self and other . Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other . Performs a hash join across the cluster.

What is the advantage of using Apache spark SQL over RDDs?

Apache Spark being an open-source framework for Bigdata has a various advantage over other big data solutions like Apache Spark is Dynamic in Nature, it supports in-memory Computation of RDDs. It provides a provision of reusability, Fault Tolerance, real-time stream processing and many more.

What is the difference between RDDs and paired RDDs?

Spark Paired RDDs are nothing but RDDs containing a key-value pair. Unpaired RDDs consists of any type of objects. However, paired RDDs (key-value) attains few special operations in it. Such as, distributed “shuffle” operations, grouping or aggregating the elements the key.


2 Answers

Soumya Simanta gave a good answer. However, the values in joined RDD are Iterable, so the results may not be very similar to ordinary table joining.

Alternatively, you can:

val mappedItems = items.map(item => (item.companyId, item)) val mappedComp = companies.map(comp => (comp.companyId, comp)) mappedItems.join(mappedComp).take(10).foreach(println) 

The output would be:

(c1,(Item(1,first,2,c1),Company(c1,company-1,city-1))) (c1,(Item(2,second,2,c1),Company(c1,company-1,city-1))) (c2,(Item(3,third,2,c2),Company(c2,company-2,city-2))) 
like image 67
viirya Avatar answered Sep 16 '22 17:09

viirya


(Using Scala) Let say you have two RDDs:

  • emp: (empid, ename, dept)

  • dept: (dname, dept)

Following is another way:

//val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30))) val emp = sc.parallelize(Seq(("jordan",10), ("ricky",20), ("matt",30), ("mince",35), ("rhonda",30)))  val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))  //val shifted_fields_emp = emp.map(t => (t._3, t._1, t._2)) val shifted_fields_emp = emp.map(t => (t._2, t._1))  val shifted_fields_dept = dept.map(t => (t._2,t._1))  shifted_fields_emp.join(shifted_fields_dept) // Create emp RDD val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)))  // Create dept RDD val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)))  // Establishing that the third field is to be considered as the Key for the emp RDD val manipulated_emp = emp.keyBy(t => t._3)  // Establishing that the second field need to be considered as the Key for dept RDD val manipulated_dept = dept.keyBy(t => t._2)  // Inner Join val join_data = manipulated_emp.join(manipulated_dept) // Left Outer Join val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept) // Right Outer Join val right_outer_join_data = manipulated_emp.rightOuterJoin(manipulated_dept) // Full Outer Join val full_outer_join_data = manipulated_emp.fullOuterJoin(manipulated_dept)  // Formatting the Joined Data for better understandable (using map) val cleaned_joined_data = join_data.map(t => (t._2._1._1, t._2._1._2, t._1, t._2._2._1)) 

This will give the output as:

// Print the output cleaned_joined_data on the console

scala> cleaned_joined_data.collect() res13: Array[(Int, String, Int, String)] = Array((3,matt,30,hive), (5,rhonda,30,hive), (2,ricky,20,spark), (1,jordan,10,hadoop)) 
like image 43
New Coder Avatar answered Sep 17 '22 17:09

New Coder