Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cleanest, most efficient syntax to perform DataFrame self-join in Spark

In standard SQL, when you join a table to itself, you can create aliases for the tables to keep track of which columns you are referring to:

SELECT a.column_name, b.column_name... FROM table1 a, table1 b WHERE a.common_field = b.common_field; 

There are two ways I can think of to achieve the same thing using the Spark DataFrame API:

Solution #1: Rename the columns

There are a couple of different methods for this in answer to this question. This one just renames all the columns with a specific suffix:

df.toDF(df.columns.map(_ + "_R"):_*) 

For example you can do:

df.join(df.toDF(df.columns.map(_ + "_R"):_*), $"common_field" === $"common_field_R") 

Solution #2: Copy the reference to the DataFrame

Another simple solution is to just do this:

val df: DataFrame = .... val df_right = df  df.join(df_right, df("common_field") === df_right("common_field")) 

Both of these solutions work, and I could see each being useful in certain situations. Are there any internal differences between the two I should be aware of?

like image 825
David Griffin Avatar asked Mar 27 '16 14:03

David Griffin


People also ask

What is Spark self join?

A self join in a DataFrame is a join in which dataFrame is joined to itself. The self join is used to identify the child and parent relation. In a Spark, you can perform self joining using two methods: Use DataFrame to join. Write Hive Self Join Query and Execute using Spark SQL.

What is the default join type in Spark?

The inner join is the default join in Spark SQL. It selects rows that have matching values in both relations.


1 Answers

There are at least two different ways you can approach this either by aliasing:

df.as("df1").join(df.as("df2"), $"df1.foo" === $"df2.foo") 

or using name-based equality joins:

// Note that it will result in ambiguous column names // so using aliases here could be a good idea as well. // df.as("df1").join(df.as("df2"), Seq("foo"))  df.join(df, Seq("foo"))   

In general column renaming, while the ugliest, is the safest practice across all the versions. There have been a few bugs related to column resolution (we found one on SO not so long ago) and some details may differ between parsers (HiveContext / standard SQLContext) if you use raw expressions.

Personally I prefer using aliases because their resemblance to an idiomatic SQL and ability to use outside the scope of a specific DataFrame objects.

Regarding performance unless you're interested in close-to-real-time processing there should be no performance difference whatsoever. All of these should generate the same execution plan.

like image 172
zero323 Avatar answered Sep 20 '22 08:09

zero323