Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why joining structure-identic dataframes gives different results?

Update: the root issue was a bug which was fixed in Spark 3.2.0.


Input df structures are identic in both runs, but outputs are different. Only the second run returns desired result (df6). I know I can use aliases for dataframes which would return desired result.

The question. What is the underlying Spark mechanics in creating df3? Spark reads df1.c1 == df2.c2 in the join's on clause, but it's evident that it does not pay attention to the dfs provided. What's under the hood there? How to anticipate such behaviour?

First run (incorrect df3 result):

data = [
    (1, 'bad', 'A'),
    (4, 'ok', None)]
df1 = spark.createDataFrame(data, ['ID', 'Status', 'c1'])
df1 = df1.withColumn('c2', F.lit('A'))
df1.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  1|   bad|   A|  A|
#|  4|    ok|null|  A|
#+---+------+----+---+

df2 = df1.filter((F.col('Status') == 'ok'))
df2.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  4|    ok|null|  A|
#+---+------+----+---+

df3 = df2.join(df1, (df1.c1 == df2.c2), 'full')
df3.show()

#+----+------+----+----+----+------+----+----+
#|  ID|Status|  c1|  c2|  ID|Status|  c1|  c2|
#+----+------+----+----+----+------+----+----+
#|   4|    ok|null|   A|null|  null|null|null|
#|null|  null|null|null|   1|   bad|   A|   A|
#|null|  null|null|null|   4|    ok|null|   A|
#+----+------+----+----+----+------+----+----+

Second run (correct df6 result):

data = [
    (1, 'bad', 'A', 'A'),
    (4, 'ok', None, 'A')]
df4 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2'])
df4.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  1|   bad|   A|  A|
#|  4|    ok|null|  A|
#+---+------+----+---+

df5 = spark.createDataFrame(data, ['ID', 'Status', 'c1', 'c2']).filter((F.col('Status') == 'ok'))
df5.show()

#+---+------+----+---+
#| ID|Status|  c1| c2|
#+---+------+----+---+
#|  4|    ok|null|  A|
#+---+------+----+---+

df6 = df5.join(df4, (df4.c1 == df5.c2), 'full')
df6.show()

#+----+------+----+----+---+------+----+---+
#|  ID|Status|  c1|  c2| ID|Status|  c1| c2|
#+----+------+----+----+---+------+----+---+
#|null|  null|null|null|  4|    ok|null|  A|
#|   4|    ok|null|   A|  1|   bad|   A|  A|
#+----+------+----+----+---+------+----+---+

I can see the physical plans are different in a way that different joins are used internally (BroadcastNestedLoopJoin and SortMergeJoin). But this by itself does not explain why results are different as they should still be same for different internal join types.

df3.explain()

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#23335 = A)
:- *(1) Project [ID#23333L, Status#23334, c1#23335, A AS c2#23339]
:  +- *(1) Filter (isnotnull(Status#23334) AND (Status#23334 = ok))
:     +- *(1) Scan ExistingRDD[ID#23333L,Status#23334,c1#23335]
+- BroadcastExchange IdentityBroadcastMode, [id=#9250]
   +- *(2) Project [ID#23379L, Status#23380, c1#23381, A AS c2#23378]
      +- *(2) Scan ExistingRDD[ID#23379L,Status#23380,c1#23381]

df6.explain()

== Physical Plan ==
SortMergeJoin [c2#23459], [c1#23433], FullOuter
:- *(2) Sort [c2#23459 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(c2#23459, 200), ENSURE_REQUIREMENTS, [id=#9347]
:     +- *(1) Filter (isnotnull(Status#23457) AND (Status#23457 = ok))
:        +- *(1) Scan ExistingRDD[ID#23456L,Status#23457,c1#23458,c2#23459]
+- *(4) Sort [c1#23433 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(c1#23433, 200), ENSURE_REQUIREMENTS, [id=#9352]
      +- *(3) Scan ExistingRDD[ID#23431L,Status#23432,c1#23433,c2#23434]
like image 862
ZygD Avatar asked Sep 24 '21 13:09

ZygD


People also ask

What will happen internally during joining the two tables in Spark?

Sticking to use cases mentioned above, Spark will perform (or be forced by us to perform) joins in two different ways: either using Sort Merge Joins if we are joining two big tables, or Broadcast Joins if at least one of the datasets involved is small enough to be stored in the memory of the single all executors.

How do you reduce shuffling in performing a join of two Spark datasets?

The shuffle can be avoided if: Both RDDs have a known partitioner. One of the datasets is small enough to fit in memory, in which case we can do a broadcast hash join (we will explain what this is later).

What is a right join in a Dataframe?

As every row in df2 has a value in df1, this right join is similar to the inner join, in this case. Let's have a look at outer joins. To best illustrate how they work, let's swap places of our DataFrames and create 2 new variables for both left and outer joins:

How to join two DataFrames into one Dataframe?

Similar to the merge method, we have a method called dataframe.join (dataframe) for joining the dataframes. Let's see steps to join two dataframes into one. The join method uses the index of the dataframe. Initialize the dataframes. Write a statment dataframe_1.join ( dataframe_2) to join.

What is join key in pandas Dataframe?

Pandas Dataframe.join () is an inbuilt function that is utilized to join or link distinctive DataFrames. In more straightforward words, Pandas Dataframe.join () can be characterized as a method of joining standard fields of various DataFrames. The columns which consist of basic qualities and are utilized for joining are called join key.

What does the ‘left’ join do in the DF_airports Dataframe?

Note that all the rows from the df_flights dataframe (the ‘ left ’ join) are in the result. Also observe that as the airport code SIN does not have an entry in the df_airports dataframe, so it has a NaN value for the AIRPORT_NAME column. The ‘left’ join ensures that the result has all the airport codes that are available in the first dataframe.


Video Answer


1 Answers

Joins depend on the structure of joined dataframes, but how you built those dataframes can have influence too. If the two dataframes you join share the same lineage, you can have ambiguous columns issues in join condition, leading to what you're describing in your question.

In you first run, as you built df2 from df1, the two dataframes share the same lineage. When you join those two dataframes, you're actually doing a self-join, with Spark selecting the wrong columns belonging to only one of the joined dataframes as join condition resulting to a cartesian product followed by an always false filter.

In your second run, as the two dataframes were built independently, join condition is rightly defined with an equality between two columns, each column belonging to a different dataframe. Thus Spark performs a classic join.


Detailed explanation

As pltc explains in his answer, in your first run Spark does not select the right columns for your join. Let's find out why.

What's under the hood?

Let's start by getting the physical plans of df1 and df2 using explain. Here is the physical plan for df1:

== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]

And here is the physical plan for df2:

== Physical Plan ==
*(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
+- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
   +- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]

You can see in first line starting by (1) Project that the two dataframes df1 and df2 have the same column names and ids: [ID#0L, Status#1, c1#2, A AS c2#6]. It is not surprising because df2 was created from df1, so you can see df2 as df1 with additional transformations. So we have the following references:

  • df1.c1 <=> df2.c1 <=> c1#2
  • df1.c2 <=> df2.c2 <=> A AS c2#6

And when you join df1 and df2, it means that you do a self-join. And all the following combinations of your condition will be translated as c1#2 = A AS c2#6, which will leave you with the simplified join condition c1#2 = A:

  • df1.c1 = df2.c2
  • df1.c2 = df2.c1
  • df2.c1 = df1.c2
  • df2.c2 = df1.c1

When you perform a self-join in Spark, Spark will regenerate column ids of the right dataframe to avoid having same column ids in the final dataframe. So in your case it will rewrite column ids of df1. So column c1#2 will refer to column c1 of df2.

Now your condition doesn't contain any columns from df1, then Spark will choose to perform cartesian product as join strategy. As one of the two dataframes is small enough to be broadcasted, the selected algorithm will be BroadcastNestedLoopJoin. This is what the physical plan of df3 shows:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, FullOuter, (c1#2 = A)
:- *(1) Project [ID#0L, Status#1, c1#2, A AS c2#6]
:  +- *(1) Filter (isnotnull(Status#1) AND (Status#1 = ok))
:     +- *(1) Scan ExistingRDD[ID#0L,Status#1,c1#2]
+- BroadcastExchange IdentityBroadcastMode, [id=#75]
   +- *(2) Project [ID#46L, Status#47, c1#48, A AS c2#45]
      +- *(2) Scan ExistingRDD[ID#46L,Status#47,c1#48]

Note that the four new column ids of df1 are now [ID#46L, Status#47, c1#48, A AS c2#45].

And when you execute this plan, as for the unique row of df2, the value of c1 is null which is different from A, thus join condition is always false. As you chose full outer join, you get the three rows (two from df1, one from df2) with null in columns coming from the other dataframe:

+----+------+----+----+----+------+----+----+
|  ID|Status|  c1|  c2|  ID|Status|  c1|  c2|
+----+------+----+----+----+------+----+----+
|   4|    ok|null|   A|null|  null|null|null|
|null|  null|null|null|   1|   bad|   A|   A|
|null|  null|null|null|   4|    ok|null|   A|
+----+------+----+----+----+------+----+----+

Why for the second run I have the desired output?

For the second run, you create two independent dataframes. So if we look at the physical plan of df4 and df5, you can see that the column ids are different. Here is the physical plan of df4:

== Physical Plan ==
*(1) Scan ExistingRDD[ID#98L,Status#99,c1#100,c2#101]

And here is the physical plan of df5:

== Physical Plan ==
*(1) Filter (isnotnull(Status#124) AND (Status#124 = ok))
+- *(1) Scan ExistingRDD[ID#123L,Status#124,c1#125,c2#126]

Your join condition is c1#100 = c2#126, c1#100 is c1 column from df4 and c2#126 is c2 column from df5. Each end of equality in join condition is from different dataframes, so Spark can perform the join as you expected.

Why this is not detected as Ambiguous Self Join?

Since Spark 3.0, Spark checks that the columns you're using for join are not ambiguous. If you inverted the order of df2 and df1 when joining them as follows:

df3 = df1.join(df2, (df1.c1 == df2.c2), 'full')

you would get the following error:

pyspark.sql.utils.AnalysisException: Column c2#6 are ambiguous.

So why don't we have this error when executing df2.join(df1, ...)?

You have your answer in the file DetectAmbiguousSelfJoin in Spark's code:

// When self-join happens, the analyzer asks the right side plan to generate
// attributes with new exprIds. If a plan of a Dataset outputs an attribute which
// is referred by a column reference, and this attribute has different exprId than
// the attribute of column reference, then the column reference is ambiguous, as it
// refers to a column that gets regenerated by self-join.

It means that when doing df2.join(df1, ...), we will only check columns used in join condition against df1. As in our case we didn't perform any transformation on df1, contrary to df2 that was filtered, exprIds of df1 columns didn't change and thus no ambiguous columns error is raised.

I've created an issue on Spark Jira about this behaviour, see SPARK-36874 (the bug was fixed in version 3.2.0).

How to anticipate such behaviour?

You have to be very careful about whether your join is a self join. If you start from a dataframe df1, perform some transformation on it to get df2, and then join df1 and df2 you risk getting such behaviour. To mitigate that, you should always put the original dataframe as first dataframe when doing a join, so having df1.join(df2, ...) instead of df2.join(df1, ...). By doing so, you will get an Analysis Exception: Column x are ambiguous if Spark doesn't manage to select the right columns.

like image 170
Vincent Doba Avatar answered Oct 02 '22 00:10

Vincent Doba