I am trying to use the Spark Dataset API but I am having some issues doing a simple join.
Let's say I have two dataset with fields: date | value
, then in the case of DataFrame
my join would look like:
val dfA : DataFrame val dfB : DataFrame dfA.join(dfB, dfB("date") === dfA("date") )
However for Dataset
there is the .joinWith
method, but the same approach does not work:
val dfA : Dataset val dfB : Dataset dfA.joinWith(dfB, ? )
What is the argument required by .joinWith
?
Integration With SparkSpark SQL allows us to query structured data inside Spark programs, using SQL or a DataFrame API which can be used in Java, Scala, Python and R.
To use joinWith
you first have to create a DataSet
, and most likely two of them. To create a DataSet
, you need to create a case class that matches your schema and call DataFrame.as[T]
where T
is your case class. So:
case class KeyValue(key: Int, value: String) val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value") val ds = df.as[KeyValue] // org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]
You could also skip the case class and use a tuple:
val tupDs = df.as[(Int,String)] // org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]
Then if you had another case class / DF, like this say:
case class Nums(key: Int, num1: Double, num2: Long) val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2") val ds2 = df2.as[Nums] // org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]
Then, while the syntax of join
and joinWith
are similar, the results are different:
df.join(df2, df.col("key") === df2.col("key")).show // +---+-----+---+----+----+ // |key|value|key|num1|num2| // +---+-----+---+----+----+ // | 1| asdf| 1| 7.7| 101| // | 2|34234| 2| 1.2| 10| // +---+-----+---+----+----+ ds.joinWith(ds2, df.col("key") === df2.col("key")).show // +---------+-----------+ // | _1| _2| // +---------+-----------+ // | [1,asdf]|[1,7.7,101]| // |[2,34234]| [2,1.2,10]| // +---------+-----------+
As you can see, joinWith
leaves the objects intact as parts of a tuple, while join
flattens out the columns into a single namespace. (Which will cause problems in the above case because the column name "key" is repeated.)
Curiously enough, I have to use df.col("key")
and df2.col("key")
to create the conditions for joining ds
and ds2
-- if you use just col("key")
on either side it does not work, and ds.col(...)
doesn't exist. Using the original df.col("key")
does the trick, however.
From https://docs.cloud.databricks.com/docs/latest/databricks_guide/05%20Spark/1%20Intro%20Datasets.html
it looks like you could just do
dfA.as("A").joinWith(dfB.as("B"), $"A.date" === $"B.date" )
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With