I am running into the problem when executing below codes:
from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext
hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
df2 = df1.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")
When I run above code, df3 is an empty DataFrame. However: If I change the code to below, it is giving the correct result (DataFrame of 2 rows):
from pyspark.sql import functions as F
from pyspark.sql import Row, HiveContext
hc = HiveContext()
rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1 = hc.createDataFrame(rows1)
rows2 = [Row(id1 = '2', id2 = '1', id3 = 'a'),
Row(id1 = '3', id2 = '2', id3 = 'a'),
Row(id1 = '4', id2 = '3', id3 = 'b')]
df1_temp = hc.createDataFrame(rows2)
df2 = df1_temp.filter(F.col("id3")=="a")
df3 = df1.join(df2, df1.id2 == df2.id1, "inner")
So My question is: why do I have to create a temp dataframe here? Also, if I can't get the HiveContext in my part of the project, how can I make a duplicate dataframe on top of the existing dataframe?
I believe that the problem you've hit here is an instance of a more general issue where certain types of DataFrame self-joins (including joins of a DataFrame against filtered copies of itself) can result in the generation of ambiguous or incorrect query plans.
There are several Spark JIRAs related to this; here are some notable ones:
There are other JIRA tickets dealing with different manifestations / aspects of these problems. Those tickets are discoverable by following chains of JIRA "relates to" links starting from the tickets listed above.
This ambiguity only crops up when referencing columns via the DataFrame instance (via subscripting, as in df["mycol"]
, or via field accesses, as in df.mycol
). This ambiguity can be avoided by aliasing DataFrames and referring to columns via the aliases. For example, the following works correctly:
>>> from pyspark.sql import functions as F
>>> df1 = hc.createDataFrame(rows1).alias("df1")
>>> df2 = df1.filter(F.col("id3")=="a").alias("df2")
>>> df3 = df1.join(df2, F.col("df1.id2") == F.col("df2.id1"), "inner")
>>> df3.show()
+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
| 4| 3| b| 3| 2| a|
| 3| 2| a| 2| 1| a|
+---+---+---+---+---+---+
I see the same behavior with this data set in Spark 2.0, but not always for the same operation. A slightly different data frame works fine.
df1 = spark.createDataFrame(
[(1, 2, 'a'), (2, 2, 'a'), (3, 4, 'b')], ['id1', 'id2', 'id3']
)
df1.show()
+---+---+---+
|id1|id2|id3|
+---+---+---+
| 1| 2| a|
| 2| 2| a|
| 3| 4| b|
+---+---+---+
df2 = df1.filter(df1.id3 == 'a')
df2.show()
+---+---+---+
|id1|id2|id3|
+---+---+---+
| 1| 2| a|
| 2| 2| a|
+---+---+---+
df3 = df1.join(df2, df1.id2 == df2.id1, 'inner')
df3.show()
+---+---+---+---+---+---+
|id1|id2|id3|id1|id2|id3|
+---+---+---+---+---+---+
| 2| 2| a| 1| 2| a|
| 2| 2| a| 2| 2| a|
+---+---+---+---+---+---+
There must be a bug? I have not tried later versions of spark though. You may want to report this as a bug.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With