Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Self-join not working as expected with the DataFrame API

I am trying to get the latest records from a table using self join. It works using spark-sql but not working using spark DataFrame API.

Can anyone help? Is it a bug?

I am using Spark 2.2.0 in local mode

Creating input DataFrame:

scala> val df3 = spark.sparkContext.parallelize(Array((1,"a",1),(1,"aa",2),(2,"b",2),(2,"bb",5))).toDF("id","value","time")
df3: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]    

scala> val df33 = df3
df33: org.apache.spark.sql.DataFrame = [id: int, value: string ... 1 more field]

scala> df3.show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|    a|   1|
|  1|   aa|   2|
|  2|    b|   2|
|  2|   bb|   5|
+---+-----+----+

scala> df33.show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|    a|   1|
|  1|   aa|   2|
|  2|    b|   2|
|  2|   bb|   5|
+---+-----+----+

Now performing the join using SQL: works

scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").show
+---+-----+----+
| id|value|time|
+---+-----+----+
|  1|   aa|   2|
|  2|   bb|   5|
+---+-----+----+

Now performing the join using dataframe API: doesn't work

scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).show
+---+-----+----+
| id|value|time|
+---+-----+----+
+---+-----+----+

The thing to notice is the explain plans: blank for the DataFrame API!!

scala> df3.join(df33, (df3.col("id") === df33.col("id")) && (df3.col("time") < df33.col("time")) ).select(df33.col("id"),df33.col("value"),df33.col("time")).explain
== Physical Plan ==
LocalTableScan <empty>, [id#150, value#151, time#152]

scala> spark.sql("select df33.* from df3 join df33 on df3.id = df33.id and df3.time < df33.time").explain
== Physical Plan ==
*Project [id#1241, value#1242, time#1243]
+- *SortMergeJoin [id#150], [id#1241], Inner, (time#152 < time#1243)
   :- *Sort [id#150 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#150, 200)
   :     +- *Project [_1#146 AS id#150, _3#148 AS time#152]
   :        +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148]
   :           +- Scan ExternalRDDScan[obj#145]
   +- *Sort [id#1241 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#1241, 200)
         +- *Project [_1#146 AS id#1241, _2#147 AS value#1242, _3#148 AS time#1243]
            +- *SerializeFromObject [assertnotnull(input[0, scala.Tuple3, true])._1 AS _1#146, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0, scala.Tuple3, true])._2, true) AS _2#147, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#148]
               +- Scan ExternalRDDScan[obj#145]
like image 297
human Avatar asked Oct 31 '17 07:10

human


People also ask

How to use self join expression in Dataframe?

Before we jump into how to use self join expression, first, let’s create a DataFrame from emp dataset, On this dataset we have an employee id column emp_id and superior/manager-employee id superior_emp_id, we use these two columns to do a self-join and find’s out a superior name for all employee’s

How to join Dataframe columns with other Dataframe?

Join columns with other DataFrame either on index or on a key column. Efficiently join multiple DataFrame objects by index at once by passing a list. Index should be similar to one of the columns in this one. If a Series is passed, its name attribute must be set, and that will be used as the column name in the resulting joined DataFrame.

How to do a self join in spark with SQL?

Instead of using a join condition with join () operator, here, we use where () to provide a self join condition. We can also use filter () to provide join condition for Spark Join operations Here, we will use the native SQL syntax in Spark to do self join.

How do I join a Dataframe with an alias?

This example joins DataFrame emptDF alias emp1 and to itself alias as emp2 by joining emp1.superior_emp_id with emp2.emp_id Above table column names are redundant, hence, let’s select the columns that are needed and rename the column name that makes sense using an alias.


1 Answers

No that's not a bug, but when you reassign the DataFrame to a new one like what you have done, it actually copies the lineage but it doesn't duplicate the data. Thus you'll be comparing on the same column.

Use spark.sql is slightly different because it's actually working on aliases of your DataFrames

So the correct way to perform a self-join using the API is actually aliasing your DataFrame as followed :

val df1 = Seq((1,"a",1),(1,"aa",2),(2,"b",2),(2,"bb",5)).toDF("id","value","time")

df1.as("df1").join(df1.as("df2"), $"df1.id" === $"df2.id" && $"df1.time" < $"df2.time").select($"df2.*").show
// +---+-----+----+
// | id|value|time|
// +---+-----+----+
// |  1|   aa|   2|
// |  2|   bb|   5|
// +---+-----+----+

For more information about self-joins, I recommend reading High Performance Spark by Rachel Warren, Holden Karau - Chapter 4.

like image 139
eliasah Avatar answered Sep 30 '22 08:09

eliasah