Update: the root issue was a bug which was fixed in Spark 3.2.0.
Input df structures are identic in both runs, but outputs are different. Only the second run returns desired result (df6
). I know I can use aliases for dataframes which would return desired result.
The question. What is the underlying Spark mechanics in creating df3
? Spark reads df1.c1 == df2.c2
in the join
's on
clause, but it's evident that it does not pay attention to the dfs provided. What's under the hood there? How to anticipate such behaviour?
First run (incorrect df3
result):
data = [
(1, 'bad', 'A'),
(4, 'ok', None)]
df1 = spark.createDataFrame(data, ['ID', 'Status', 'c1'])
df1 = df1.withColumn('c2', F.lit('A'))
df1.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 1| bad| A| A|
#| 4| ok|null| A|
#+---+------+----+---+
df2 = df1.filter((F.col('Status') == 'ok'))
df2.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 4| ok|null| A|
#+---+------+----+---+
df3 = df2.join(df1, (df1.c1 == df2.c2), 'full')
df3.show()
#+----+------+----+----+----+------+----+----+
#| ID|Status| c1| c2| ID|Status| c1| c2|
#+----+------+----+----+----+------+----+----+
#| 4| ok|null| A|null| null|null|null|
#|null| null|null|null| 1| bad| A| A|
#|null| null|null|null| 4| ok|null| A|
#+----+------+----+----+----+------+----+----+
Second run (correct df6
result):
data = [
(1, 'bad', 'A', 'A'),
(4, 'ok', None, 'A')]
df4 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2'])
df4.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 1| bad| A| A|
#| 4| ok|null| A|
#+---+------+----+---+
df5 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2']).filter((F.col('Status') == 'ok'))
df5.show()
#+---+------+----+---+
#| ID|Status| c1| c2|
#+---+------+----+---+
#| 4| ok|null| A|
#+---+------+----+---+
df6 = df5.join(df4, (df4.c1 == df5.c2), 'full')
df6.show()
#+----+------+----+----+---+------+----+---+
#| ID|Status| c1| c2| ID|Status| c1| c2|
#+----+------+----+----+---+------+----+---+
#|null| null|null|null| 4| ok|null| A|
#| 4| ok|null| A| 1| bad| A| A|
#+----+------+----+----+---+------+----+---+
I can see the physical plans are different in a way that different joins are used internally (BroadcastNestedLoopJoin and SortMergeJoin). But this by itself does not explain why results are different as they should still be same for different internal join types.
df3.explain()
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#23335 = A)
:- *(1) Project [ID#23333L, Status#23334, c1#23335, A AS c2#23339]
: +- *(1) Filter (isnotnull(Status#23334) AND (Status#23334 = ok))
: +- *(1) Scan ExistingRDD[ID#23333L,Status#23334,c1#23335]
+- BroadcastExchange IdentityBroadcastMode, [id=#9250]
+- *(2) Project [ID#23379L, Status#23380, c1#23381, A AS c2#23378]
+- *(2) Scan ExistingRDD[ID#23379L,Status#23380,c1#23381]
df6.explain()
== Physical Plan ==
SortMergeJoin [c2#23459], [c1#23433], FullOuter
:- *(2) Sort [c2#23459 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(c2#23459, 200), ENSURE_REQUIREMENTS, [id=#9347]
: +- *(1) Filter (isnotnull(Status#23457) AND (Status#23457 = ok))
: +- *(1) Scan ExistingRDD[ID#23456L,Status#23457,c1#23458,c2#23459]
+- *(4) Sort [c1#23433 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(c1#23433, 200), ENSURE_REQUIREMENTS, [id=#9352]
+- *(3) Scan ExistingRDD[ID#23431L,Status#23432,c1#23433,c2#23434]
Sticking to use cases mentioned above, Spark will perform (or be forced by us to perform) joins in two different ways: either using Sort Merge Joins if we are joining two big tables, or Broadcast Joins if at least one of the datasets involved is small enough to be stored in the memory of the single all executors.
The shuffle can be avoided if: Both RDDs have a known partitioner. One of the datasets is small enough to fit in memory, in which case we can do a broadcast hash join (we will explain what this is later).
As every row in df2 has a value in df1, this right join is similar to the inner join, in this case. Let's have a look at outer joins. To best illustrate how they work, let's swap places of our DataFrames and create 2 new variables for both left and outer joins:
Similar to the merge method, we have a method called dataframe.join (dataframe) for joining the dataframes. Let's see steps to join two dataframes into one. The join method uses the index of the dataframe. Initialize the dataframes. Write a statment dataframe_1.join ( dataframe_2) to join.
Pandas Dataframe.join () is an inbuilt function that is utilized to join or link distinctive DataFrames. In more straightforward words, Pandas Dataframe.join () can be characterized as a method of joining standard fields of various DataFrames. The columns which consist of basic qualities and are utilized for joining are called join key.
Note that all the rows from the df_flights dataframe (the ‘ left ’ join) are in the result. Also observe that as the airport code SIN does not have an entry in the df_airports dataframe, so it has a NaN value for the AIRPORT_NAME column. The ‘left’ join ensures that the result has all the airport codes that are available in the first dataframe.
Joins depend on the structure of joined dataframes, but how you built those dataframes can have influence too. If the two dataframes you join share the same lineage, you can have ambiguous columns issues in join condition, leading to what you're describing in your question.
In you first run, as you built df2
from df1
, the two dataframes share the same lineage. When you join those two dataframes, you're actually doing a self-join, with Spark selecting the wrong columns belonging to only one of the joined dataframes as join condition resulting to a cartesian product followed by an always false filter.
In your second run, as the two dataframes were built independently, join condition is rightly defined with an equality between two columns, each column belonging to a different dataframe. Thus Spark performs a classic join.
As pltc explains in his answer, in your first run Spark does not select the right columns for your join. Let's find out why.
Let's start by getting the physical plans of df1
and df2
using explain
. Here is the physical plan for df1
:
== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
And here is the physical plan for df2
:
== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
You can see in first line starting by (1) Project
that the two dataframes df1
and df2
have the same column names and ids: [ID#0L, Status#1, c1#2, A AS c2#6]
. It is not surprising because df2
was created from df1
, so you can see df2
as df1
with additional transformations. So we have the following references:
df1.c1
<=> df2.c1
<=> c1#2
df1.c2
<=> df2.c2
<=> A AS c2#6
And when you join df1
and df2
, it means that you do a self-join. And all the following combinations of your condition will be translated as c1#2 = A AS c2#6
, which will leave you with the simplified join condition c1#2 = A
:
df1.c1 = df2.c2
df1.c2 = df2.c1
df2.c1 = df1.c2
df2.c2 = df1.c1
When you perform a self-join in Spark, Spark will regenerate column ids of the right dataframe to avoid having same column ids in the final dataframe. So in your case it will rewrite column ids of df1
. So column c1#2
will refer to column c1
of df2
.
Now your condition doesn't contain any columns from df1
, then Spark will choose to perform cartesian product as join strategy. As one of the two dataframes is small enough to be broadcasted, the selected algorithm will be BroadcastNestedLoopJoin
. This is what the physical plan of df3
shows:
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#2 = A)
:- *(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
: +- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
: +- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
+- BroadcastExchange IdentityBroadcastMode, [id=#75]
+- *(2) Project [ID#46L, Status#47, c1#48, A AS c2#45]
+- *(2) Scan ExistingRDD[ID#46L,Status#47,c1#48]
Note that the four new column ids of df1
are now [ID#46L, Status#47, c1#48, A AS c2#45]
.
And when you execute this plan, as for the unique row of df2
, the value of c1
is null
which is different from A
, thus join condition is always false. As you chose full outer join, you get the three rows (two from df1
, one from df2
) with null
in columns coming from the other dataframe:
+----+------+----+----+----+------+----+----+
| ID|Status| c1| c2| ID|Status| c1| c2|
+----+------+----+----+----+------+----+----+
| 4| ok|null| A|null| null|null|null|
|null| null|null|null| 1| bad| A| A|
|null| null|null|null| 4| ok|null| A|
+----+------+----+----+----+------+----+----+
For the second run, you create two independent dataframes. So if we look at the physical plan of df4
and df5
, you can see that the column ids are different. Here is the physical plan of df4
:
== Physical Plan ==
*(1) Scan ExistingRDD[ID#98L,Status#99,c1#100,c2#101]
And here is the physical plan of df5
:
== Physical Plan ==
*(1) Filter (isnotnull(Status#124) AND (Status#124 = ok))
+- *(1) Scan ExistingRDD[ID#123L,Status#124,c1#125,c2#126]
Your join condition is c1#100 = c2#126
, c1#100
is c1
column from df4
and c2#126
is c2
column from df5
. Each end of equality in join condition is from different dataframes, so Spark can perform the join as you expected.
Since Spark 3.0, Spark checks that the columns you're using for join are not ambiguous. If you inverted the order of df2
and df1
when joining them as follows:
df3 = df1.join(df2, (df1.c1 == df2.c2), 'full')
you would get the following error:
pyspark.sql.utils.AnalysisException: Column c2#6 are ambiguous.
So why don't we have this error when executing df2.join(df1, ...)
?
You have your answer in the file DetectAmbiguousSelfJoin in Spark's code:
// When self-join happens, the analyzer asks the right side plan to generate
// attributes with new exprIds. If a plan of a Dataset outputs an attribute which
// is referred by a column reference, and this attribute has different exprId than
// the attribute of column reference, then the column reference is ambiguous, as it
// refers to a column that gets regenerated by self-join.
It means that when doing df2.join(df1, ...)
, we will only check columns used in join condition against df1
. As in our case we didn't perform any transformation on df1
, contrary to df2
that was filtered, exprIds of df1
columns didn't change and thus no ambiguous columns error is raised.
I've created an issue on Spark Jira about this behaviour, see SPARK-36874 (the bug was fixed in version 3.2.0).
You have to be very careful about whether your join is a self join. If you start from a dataframe df1
, perform some transformation on it to get df2
, and then join df1
and df2
you risk getting such behaviour. To mitigate that, you should always put the original dataframe as first dataframe when doing a join, so having df1.join(df2, ...)
instead of df2.join(df1, ...)
. By doing so, you will get an Analysis Exception: Column x are ambiguous
if Spark doesn't manage to select the right columns.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With