Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Joining two DataFrames from the same source

I am using the DataFrame API of pyspark (Apache Spark) and am running into the following problem:

When I join two DataFrames that originate from the same source DataFrame, the resulting DF will explode to a huge number of rows. A quick example:

I load a DataFrame with n rows from disk:

df = sql_context.parquetFile('data.parquet')

Then I create two DataFrames from that source.

df_one = df.select('col1', 'col2')
df_two = df.select('col1', 'col3')

Finally I want to (inner) join them back together:

df_joined = df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner')

The key in col1 is unique. The resulting DataFrame should have n rows, however it does have n*n rows.

That does not happen, when I load df_one and df_two from disk directly. I am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot.

Can anyone explain why that happens?

like image 289
karlson Avatar asked Apr 21 '15 15:04

karlson


2 Answers

If I'm reading this correctly, df_two doesn't have a col2

    df_one = df.select('col1', 'col2')
    df_two = df.select('col1', 'col3')

So when you do:

    df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner')

That should fail. If you meant to say

    df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner')

However, the fact that you're loading from the same data frame should have no impact. I would suggest that you do:

    df_one.show()
    df_two.show()

To ensure that the data you've selected is what you expected.

like image 120
user3124181 Avatar answered Nov 15 '22 10:11

user3124181


I'm seeing this problem in my large dataset too, on Spark 1.3. Unfortunately, in the small, contrived examples I made up 'join' works correctly. I feel like there's some underlying bug from the steps preceeding the join perhaps

Performing the join (Note: DateTime is just a string):

> join = df1.join(df2, df1.DateTime == df2.DateTime, "inner")
> join.count()

250000L

This is obviously returning the full 500*500 cartesian join.

What does work for me is switching to SQL:

  > sqlc.registerDataFrameAsTable(df1, "df1")
  > sqlc.registerDataFrameAsTable(df2, "df2")
  > join = sqlc.sql("select * from df1, df2 where df1.DateTime = df2.DateTime")
  > join.count()
  471L

That value looks right.

Seeing this, I personally will not be using pyspark's DataFrame.join() until I can understand this difference better.

like image 31
Zach Garner Avatar answered Nov 15 '22 11:11

Zach Garner