Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Outer join Spark dataframe with non-identical join column and then merge join column

Suppose I have the following dataframes in pySpark:

df1 = sqlContext.createDataFrame([Row(name='john', age=50), Row(name='james', age=25)])
df2 = sqlContext.createDataFrame([Row(name='john', weight=150), Row(name='mike', weight=115)])
df3 = sqlContext.createDataFrame([Row(name='john', age=50, weight=150), Row(name='james', age=25, weight=None), Row(name='mike', age=None, weight=115)])

Now suppose I want to create df3 from joining/merging df1 and df2.

I tried doing

df1.join(df2, df1.name == df2.name, 'outer')

This doesn't quite work exactly because it produces two name columns. I need to then somehow combine the two name columns so that missing names from one name column are filled in by the missing name from the other name column.

How would I do that? Or is there a better way to create df3 from df1 and df2?

like image 490
plam Avatar asked Aug 23 '15 09:08

plam


People also ask

How do I merge two DataFrames with different columns in spark?

In PySpark to merge two DataFrames with different columns, will use the similar approach explain above and uses unionByName() transformation. First let's create DataFrame's with different number of columns. Now add missing columns ' state ' and ' salary ' to df1 and ' age ' to df2 with null values.

How do I merge two DataFrames with different column names?

Different column names are specified for merges in Pandas using the “left_on” and “right_on” parameters, instead of using only the “on” parameter. Merging dataframes with different names for the joining variable is achieved using the left_on and right_on arguments to the pandas merge function.

Can we perform cross join in DataFrame in PySpark?

Spark DataFrame supports all basic SQL Join Types like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN.


2 Answers

You can use coallesce function which returns the first not-null argument.

from pyspark.sql.functions import coalesce

df1 = df1.alias("df1")
df2 = df2.alias("df2")

(df1.join(df2, df1.name == df2.name, 'outer')
  .withColumn("name_", coalesce("df1.name", "df2.name"))
  .drop("name")
  .withColumnRenamed("name_", "name"))
like image 58
zero323 Avatar answered Dec 11 '22 14:12

zero323


This is a little late, but there is a simpler solution if someone needs it. Just a simple change from original poster's solution:

df1.join(df2, 'name', 'outer')
like image 20
V. Samma Avatar answered Dec 11 '22 14:12

V. Samma