I am trying to get the latest records from a table using self join. It works using spark-sql
but not working using spark DataFrame
API.
Can anyone help? Is it a bug?
I am using Spark 2.2.0 in local mode
Creating input DataFrame
:
scala> val df3 = spark.sparkContext.parallelize(Array((1,"a",1),(1,"aa",2),(2,"b",2),(2,"bb",5))).toDF("id","value","time")
df3: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]
scala> val df33 = df3
df33: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]
scala> df3.show
+---+-----+----+
| id|value|time|
+---+-----+----+
| 1| a| 1|
| 1| aa| 2|
| 2| b| 2|
| 2| bb| 5|
+---+-----+----+
scala> df33.show
+---+-----+----+
| id|value|time|
+---+-----+----+
| 1| a| 1|
| 1| aa| 2|
| 2| b| 2|
| 2| bb| 5|
+---+-----+----+
Now performing the join using SQL: works
scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").show
+---+-----+----+
| id|value|time|
+---+-----+----+
| 1| aa| 2|
| 2| bb| 5|
+---+-----+----+
Now performing the join using dataframe API: doesn't work
scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).show
+---+-----+----+
| id|value|time|
+---+-----+----+
+---+-----+----+
The thing to notice is the explain plans: blank for the DataFrame
API!!
scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).explain
== Physical Plan ==
LocalTableScan <empty>, [id#150, value#151, time#152]
scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").explain
== Physical Plan ==
*Project [id#1241, value#1242, time#1243]
+- *SortMergeJoin [id#150], [id#1241], Inner, (time#152 < time#1243)
:- *Sort [id#150 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#150, 200)
: +- *Project [_1#146 AS id#150, _3#148 AS time#152]
: +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148]
: +- Scan ExternalRDDScan[obj#145]
+- *Sort [id#1241 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#1241, 200)
+- *Project [_1#146 AS id#1241, _2#147 AS value#1242, _3#148 AS time#1243]
+- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148]
+- Scan ExternalRDDScan[obj#145]
Before we jump into how to use self join expression, first, let’s create a DataFrame from emp dataset, On this dataset we have an employee id column emp_id and superior/manager-employee id superior_emp_id, we use these two columns to do a self-join and find’s out a superior name for all employee’s
Join columns with other DataFrame either on index or on a key column. Efficiently join multiple DataFrame objects by index at once by passing a list. Index should be similar to one of the columns in this one. If a Series is passed, its name attribute must be set, and that will be used as the column name in the resulting joined DataFrame.
Instead of using a join condition with join () operator, here, we use where () to provide a self join condition. We can also use filter () to provide join condition for Spark Join operations Here, we will use the native SQL syntax in Spark to do self join.
This example joins DataFrame emptDF alias emp1 and to itself alias as emp2 by joining emp1.superior_emp_id with emp2.emp_id Above table column names are redundant, hence, let’s select the columns that are needed and rename the column name that makes sense using an alias.
No that's not a bug, but when you reassign the DataFrame to a new one like what you have done, it actually copies the lineage but it doesn't duplicate the data. Thus you'll be comparing on the same column.
Use spark.sql
is slightly different because it's actually working on aliases of your DataFrame
s
So the correct way to perform a self-join using the API is actually aliasing your DataFrame
as followed :
val df1 = Seq((1,"a",1),(1,"aa",2),(2,"b",2),(2,"bb",5)).toDF("id","value","time")
df1.as("df1").join(df1.as("df2"), $"df1.id" === $"df2.id" && $"df1.time" < $"df2.time").select($"df2.*").show
// +---+-----+----+
// | id|value|time|
// +---+-----+----+
// | 1| aa| 2|
// | 2| bb| 5|
// +---+-----+----+
For more information about self-joins, I recommend reading High Performance Spark by Rachel Warren, Holden Karau - Chapter 4.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With