Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark.sql.filter not performing as it should

I am running into the problem when executing below codes:

from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext

hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
         Row(id1 = '3', id2 = '2', id3 = 'a'),
         Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
df2 = df1.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")

When I run above code, df3 is an empty DataFrame. However: If I change the code to below, it is giving the correct result (DataFrame of 2 rows):

from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext

hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
         Row(id1 = '3', id2 = '2', id3 = 'a'),
         Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
rows2 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
         Row(id1 = '3', id2 = '2', id3 = 'a'),
         Row(id1 = '4', id2 = '3', id3 = 'b')]
df1_temp = hc.createDataFrame(rows2)
df2 = df1_temp.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")

So My question is: why do I have to create a temp dataframe here? Also, if I can't get the HiveContext in my part of the project, how can I make a duplicate dataframe on top of the existing dataframe?

like image 572
WEIHANG LIU Avatar asked Apr 24 '18 07:04

WEIHANG LIU


2 Answers

I believe that the problem you've hit here is an instance of a more general issue where certain types of DataFrame self-joins (including joins of a DataFrame against filtered copies of itself) can result in the generation of ambiguous or incorrect query plans.

There are several Spark JIRAs related to this; here are some notable ones:

  • SPARK-15063: "filtering and joining back doesn't work" appears to be the closest match for the specific type of instance you've reported.
  • SPARK-17154: "Wrong result can be returned or AnalysisException can be thrown after self-join or similar operations" has a good discussion of the underlying cause.

There are other JIRA tickets dealing with different manifestations / aspects of these problems. Those tickets are discoverable by following chains of JIRA "relates to" links starting from the tickets listed above.

This ambiguity only crops up when referencing columns via the DataFrame instance (via subscripting, as in df["mycol"], or via field accesses, as in df.mycol). This ambiguity can be avoided by aliasing DataFrames and referring to columns via the aliases. For example, the following works correctly:

>>> from pyspark.sql import functions as F
>>> df1 = hc.createDataFrame(rows1).alias("df1")
>>> df2 = df1.filter(F.col("id3")=="a").alias("df2")
>>> df3 = df1.join(df2, F.col("df1.id2") == F.col("df2.id1"), "inner")
>>> df3.show()
+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
|  4|  3|  b|  3|  2|  a|
|  3|  2|  a|  2|  1|  a|
+---+---+---+---+---+---+
like image 106
Josh Rosen Avatar answered Nov 09 '22 15:11

Josh Rosen


I see the same behavior with this data set in Spark 2.0, but not always for the same operation. A slightly different data frame works fine.

df1 = spark.createDataFrame(
    [(1, 2, 'a'), (2, 2, 'a'), (3, 4, 'b')], ['id1', 'id2', 'id3']
    )
df1.show()

+---+---+---+
|id1|id2|id3|
+---+---+---+
|  1|  2|  a|
|  2|  2|  a|
|  3|  4|  b|
+---+---+---+

df2 = df1.filter(df1.id3 == 'a')
df2.show()

+---+---+---+
|id1|id2|id3|
+---+---+---+
|  1|  2|  a|
|  2|  2|  a|
+---+---+---+


df3 = df1.join(df2, df1.id2 == df2.id1, 'inner')
df3.show()

+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
|  2|  2|  a|  1|  2|  a|
|  2|  2|  a|  2|  2|  a|
+---+---+---+---+---+---+

There must be a bug? I have not tried later versions of spark though. You may want to report this as a bug.

like image 28
Gopala Avatar answered Nov 09 '22 14:11

Gopala