Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to join Datasets on multiple columns?

Given two Spark Datasets, A and B I can do a join on single column as follows:

a.joinWith(b, $"a.col" === $"b.col", "left")

My question is whether you can do a join using multiple columns. Essentially the equivalent of the following DataFrames api code:

a.join(b, a("col") === b("col") && a("col2") === b("col2"), "left")
like image 479
d80tb7 Avatar asked Jun 16 '16 06:06

d80tb7


People also ask

Can you join on multiple columns?

If you'd like to get data stored in tables joined by a compound key that's a primary key in one table and a foreign key in another table, simply use a join condition on multiple columns. In one joined table (in our example, enrollment ), we have a primary key built from two columns ( student_id and course_code ).

Can you join 3 columns in SQL?

Using JOIN in SQL doesn't mean you can only join two tables. You can join 3, 4, or even more! The possibilities are limitless.

How join tables with different columns in SQL?

Merging tables by columns. Multiple tables can be merged by columns in SQL using joins. Joins merge two tables based on the specified columns (generally, the primary key of one table and a foreign key of the other).


3 Answers

You can do it exactly the same way as with Dataframe:

val xs = Seq(("a", "foo", 2.0), ("x", "bar", -1.0)).toDS
val ys = Seq(("a", "foo", 2.0), ("y", "bar", 1.0)).toDS

xs.joinWith(ys, xs("_1") === ys("_1") && xs("_2") === ys("_2"), "left").show
// +------------+-----------+
// |          _1|         _2|
// +------------+-----------+
// | [a,foo,2.0]|[a,foo,2.0]|
// |[x,bar,-1.0]|       null|
// +------------+-----------+

In Spark < 2.0.0 you can use something like this:

xs.as("xs").joinWith(
  ys.as("ys"), ($"xs._1" === $"ys._1") && ($"xs._2" === $"ys._2"), "left")
like image 63
zero323 Avatar answered Oct 04 '22 19:10

zero323


There's another way of joining by chaining where one after another. You first specify a join (and optionally its type) followed by where operator(s), i.e.

scala> case class A(id: Long, name: String)
defined class A

scala> case class B(id: Long, name: String)
defined class B

scala> val as = Seq(A(0, "zero"), A(1, "one")).toDS
as: org.apache.spark.sql.Dataset[A] = [id: bigint, name: string]

scala> val bs = Seq(B(0, "zero"), B(1, "jeden")).toDS
bs: org.apache.spark.sql.Dataset[B] = [id: bigint, name: string]

scala> as.join(bs).where(as("id") === bs("id")).show
+---+----+---+-----+
| id|name| id| name|
+---+----+---+-----+
|  0|zero|  0| zero|
|  1| one|  1|jeden|
+---+----+---+-----+


scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).show
+---+----+---+----+
| id|name| id|name|
+---+----+---+----+
|  0|zero|  0|zero|
+---+----+---+----+

The reason for such a goodie is that the Spark optimizer will join (no pun intended) consecutive wheres into one with join. Use explain operator to see the underlying logical and physical plans.

scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).explain(extended = true)
== Parsed Logical Plan ==
Filter (name#31 = name#36)
+- Filter (id#30L = id#35L)
   +- Join Inner
      :- LocalRelation [id#30L, name#31]
      +- LocalRelation [id#35L, name#36]

== Analyzed Logical Plan ==
id: bigint, name: string, id: bigint, name: string
Filter (name#31 = name#36)
+- Filter (id#30L = id#35L)
   +- Join Inner
      :- LocalRelation [id#30L, name#31]
      +- LocalRelation [id#35L, name#36]

== Optimized Logical Plan ==
Join Inner, ((name#31 = name#36) && (id#30L = id#35L))
:- Filter isnotnull(name#31)
:  +- LocalRelation [id#30L, name#31]
+- Filter isnotnull(name#36)
   +- LocalRelation [id#35L, name#36]

== Physical Plan ==
*BroadcastHashJoin [name#31, id#30L], [name#36, id#35L], Inner, BuildRight
:- *Filter isnotnull(name#31)
:  +- LocalTableScan [id#30L, name#31]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false], input[0, bigint, false]))
   +- *Filter isnotnull(name#36)
      +- LocalTableScan [id#35L, name#36]
like image 23
Jacek Laskowski Avatar answered Oct 04 '22 20:10

Jacek Laskowski


In Java, the && operator does not work. The correct way to join based on multiple columns in Spark-Java is as below:

            Dataset<Row> datasetRf1 = joinedWithDays.join(
                    datasetFreq, 
                    datasetFreq.col("userId").equalTo(joinedWithDays.col("userId"))
                    .and(datasetFreq.col("artistId").equalTo(joinedWithDays.col("artistId"))),
                            "inner"
                    );

The and function works like the && operator.

like image 40
ForeverLearner Avatar answered Oct 04 '22 19:10

ForeverLearner