I am using the DataFrame API of pyspark (Apache Spark) and am running into the following problem:
When I join two DataFrames that originate from the same source DataFrame, the resulting DF will explode to a huge number of rows. A quick example:
I load a DataFrame with n
rows from disk:
df = sql_context.parquetFile('data.parquet')
Then I create two DataFrames from that source.
df_one = df.select('col1', 'col2')
df_two = df.select('col1', 'col3')
Finally I want to (inner) join them back together:
df_joined = df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner')
The key in col1
is unique. The resulting DataFrame should have n
rows, however it does have n*n
rows.
That does not happen, when I load df_one
and df_two
from disk directly. I am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot.
Can anyone explain why that happens?
If I'm reading this correctly, df_two doesn't have a col2
df_one = df.select('col1', 'col2')
df_two = df.select('col1', 'col3')
So when you do:
df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner')
That should fail. If you meant to say
df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner')
However, the fact that you're loading from the same data frame should have no impact. I would suggest that you do:
df_one.show()
df_two.show()
To ensure that the data you've selected is what you expected.
I'm seeing this problem in my large dataset too, on Spark 1.3. Unfortunately, in the small, contrived examples I made up 'join' works correctly. I feel like there's some underlying bug from the steps preceeding the join perhaps
Performing the join (Note: DateTime is just a string):
> join = df1.join(df2, df1.DateTime == df2.DateTime, "inner")
> join.count()
250000L
This is obviously returning the full 500*500 cartesian join.
What does work for me is switching to SQL:
> sqlc.registerDataFrameAsTable(df1, "df1")
> sqlc.registerDataFrameAsTable(df2, "df2")
> join = sqlc.sql("select * from df1, df2 where df1.DateTime = df2.DateTime")
> join.count()
471L
That value looks right.
Seeing this, I personally will not be using pyspark's DataFrame.join() until I can understand this difference better.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With