Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Removing duplicate columns after a DF join in Spark

When you join two DFs with similar column names:

df = df1.join(df2, df1['id'] == df2['id'])

Join works fine but you can't call the id column because it is ambiguous and you would get the following exception:

pyspark.sql.utils.AnalysisException: "Reference 'id' is ambiguous, could be: id#5691, id#5918.;"

This makes id not usable anymore...

The following function solves the problem:

def join(df1, df2, cond, how='left'):
    df = df1.join(df2, cond, how=how)
    repeated_columns = [c for c in df1.columns if c in df2.columns]
    for col in repeated_columns:
        df = df.drop(df2[col])
    return df

What I don't like about it is that I have to iterate over the column names and delete them why by one. This looks really clunky...

Do you know of any other solution that will either join and remove duplicates more elegantly or delete multiple columns without iterating over each of them?

like image 519
thecheech Avatar asked Oct 26 '17 01:10

thecheech


People also ask

How remove duplicate columns after join spark?

Method 1: Using drop() function We can join the dataframes using joins like inner join and after this join, we can use the drop method to remove one duplicate column.

How do I remove duplicate columns in PySpark join?

Removing duplicate columns after join in PySpark If we want to drop the duplicate column, then we have to specify the duplicate column in the join function. Here we are simply using join to join two dataframes and then drop duplicate columns.

How do you drop multiple columns after join in PySpark?

Drop multiple column in pyspark using drop() function. Drop function with list of column names as argument drops those columns.


4 Answers

If the join columns at both data frames have the same names and you only need equi join, you can specify the join columns as a list, in which case the result will only keep one of the join columns:

df1.show()
+---+----+
| id|val1|
+---+----+
|  1|   2|
|  2|   3|
|  4|   4|
|  5|   5|
+---+----+

df2.show()
+---+----+
| id|val2|
+---+----+
|  1|   2|
|  1|   3|
|  2|   4|
|  3|   5|
+---+----+

df1.join(df2, ['id']).show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

Otherwise you need to give the join data frames alias and refer to the duplicated columns by the alias later:

df1.alias("a").join(
    df2.alias("b"), df1['id'] == df2['id']
).select("a.id", "a.val1", "b.val2").show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+
like image 187
Psidom Avatar answered Oct 19 '22 00:10

Psidom


df.join(other, on, how) when on is a column name string, or a list of column names strings, the returned dataframe will prevent duplicate columns. when on is a join expression, it will result in duplicate columns. We can use .drop(df.a) to drop duplicate columns. Example:

cond = [df.a == other.a, df.b == other.bb, df.c == other.ccc]
# result will have duplicate column a
result = df.join(other, cond, 'inner').drop(df.a)
like image 41
jerrytim Avatar answered Oct 18 '22 22:10

jerrytim


Assuming 'a' is a dataframe with column 'id' and 'b' is another dataframe with column 'id'

I use the following two methods to remove duplicates:

Method 1: Using String Join Expression as opposed to boolean expression. This automatically remove a duplicate column for you

a.join(b, 'id')

Method 2: Renaming the column before the join and dropping it after

b.withColumnRenamed('id', 'b_id')
joinexpr = a['id'] == b['b_id']
a.join(b, joinexpr).drop('b_id)
like image 15
Heapify Avatar answered Oct 18 '22 23:10

Heapify


The code below works with Spark 1.6.0 and above.

salespeople_df.show()
+---+------+-----+
|Num|  Name|Store|
+---+------+-----+
|  1| Henry|  100|
|  2| Karen|  100|
|  3|  Paul|  101|
|  4| Jimmy|  102|
|  5|Janice|  103|
+---+------+-----+

storeaddress_df.show()
+-----+--------------------+
|Store|             Address|
+-----+--------------------+
|  100|    64 E Illinos Ave|
|  101|         74 Grand Pl|
|  102|          2298 Hwy 7|
|  103|No address available|
+-----+--------------------+

Assuming -in this example- that the name of the shared column is the same:

joined=salespeople_df.join(storeaddress_df, ['Store'])
joined.orderBy('Num', ascending=True).show()

+-----+---+------+--------------------+
|Store|Num|  Name|             Address|
+-----+---+------+--------------------+
|  100|  1| Henry|    64 E Illinos Ave|
|  100|  2| Karen|    64 E Illinos Ave|
|  101|  3|  Paul|         74 Grand Pl|
|  102|  4| Jimmy|          2298 Hwy 7|
|  103|  5|Janice|No address available|
+-----+---+------+--------------------+

.join will prevent the duplication of the shared column.

Let's assume that you want to remove the column Num in this example, you can just use .drop('colname')

joined=joined.drop('Num')
joined.show()

+-----+------+--------------------+
|Store|  Name|             Address|
+-----+------+--------------------+
|  103|Janice|No address available|
|  100| Henry|    64 E Illinos Ave|
|  100| Karen|    64 E Illinos Ave|
|  101|  Paul|         74 Grand Pl|
|  102| Jimmy|          2298 Hwy 7|
+-----+------+--------------------+
like image 6
hussam Avatar answered Oct 18 '22 23:10

hussam