Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataset API - join

Tags:

I am trying to use the Spark Dataset API but I am having some issues doing a simple join.

Let's say I have two dataset with fields: date | value, then in the case of DataFrame my join would look like:

val dfA : DataFrame val dfB : DataFrame  dfA.join(dfB, dfB("date") === dfA("date") ) 

However for Dataset there is the .joinWith method, but the same approach does not work:

val dfA : Dataset val dfB : Dataset  dfA.joinWith(dfB, ? ) 

What is the argument required by .joinWith ?

like image 571
mastro Avatar asked Apr 06 '16 21:04

mastro


People also ask

Does Spark SQL support Dataset API?

Integration With SparkSpark SQL allows us to query structured data inside Spark programs, using SQL or a DataFrame API which can be used in Java, Scala, Python and R.


2 Answers

To use joinWith you first have to create a DataSet, and most likely two of them. To create a DataSet, you need to create a case class that matches your schema and call DataFrame.as[T] where T is your case class. So:

case class KeyValue(key: Int, value: String) val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value") val ds = df.as[KeyValue] // org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string] 

You could also skip the case class and use a tuple:

val tupDs = df.as[(Int,String)] // org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string] 

Then if you had another case class / DF, like this say:

case class Nums(key: Int, num1: Double, num2: Long) val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2") val ds2 = df2.as[Nums] // org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint] 

Then, while the syntax of join and joinWith are similar, the results are different:

df.join(df2, df.col("key") === df2.col("key")).show // +---+-----+---+----+----+ // |key|value|key|num1|num2| // +---+-----+---+----+----+ // |  1| asdf|  1| 7.7| 101| // |  2|34234|  2| 1.2|  10| // +---+-----+---+----+----+  ds.joinWith(ds2, df.col("key") === df2.col("key")).show // +---------+-----------+ // |       _1|         _2| // +---------+-----------+ // | [1,asdf]|[1,7.7,101]| // |[2,34234]| [2,1.2,10]| // +---------+-----------+ 

As you can see, joinWith leaves the objects intact as parts of a tuple, while join flattens out the columns into a single namespace. (Which will cause problems in the above case because the column name "key" is repeated.)

Curiously enough, I have to use df.col("key") and df2.col("key") to create the conditions for joining ds and ds2 -- if you use just col("key") on either side it does not work, and ds.col(...) doesn't exist. Using the original df.col("key") does the trick, however.

like image 109
David Griffin Avatar answered Oct 06 '22 00:10

David Griffin


From https://docs.cloud.databricks.com/docs/latest/databricks_guide/05%20Spark/1%20Intro%20Datasets.html

it looks like you could just do

dfA.as("A").joinWith(dfB.as("B"), $"A.date" === $"B.date" ) 
like image 40
Raghuram Onti Srinivasan Avatar answered Oct 05 '22 23:10

Raghuram Onti Srinivasan