Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataframe distinguish columns with duplicated name

So as I know in Spark Dataframe, that for multiple columns can have the same name as shown in below dataframe snapshot:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

Above result is created by join with a dataframe to itself, you can see there are 4 columns with both two a and f.

The problem is is there when I try to do more calculation with the a column, I cant find a way to select the a, I have try df[0] and df.select('a'), both returned me below error mesaage:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

Is there anyway in Spark API that I can distinguish the columns from the duplicated names again? or maybe some way to let me change the column names?

like image 691
resec Avatar asked Nov 18 '15 11:11

resec


People also ask

Can Spark DataFrame have two columns with same name?

If you perform a join in Spark and don't specify your join correctly you'll end up with duplicate column names. This makes it harder to select those columns.

How do you find duplicate columns in PySpark?

➠ Find complete row duplicates: GroupBy can be used along with count() aggregate function on all the columns (using df. ➠ Find column level duplicates: GroupBy with required columns can be used along with count() aggregate function and then filter can be used to get duplicate records.

How do I remove duplicate column names in PySpark?

Removing duplicate columns after join in PySpark If we want to drop the duplicate column, then we have to specify the duplicate column in the join function. Here we are simply using join to join two dataframes and then drop duplicate columns.


10 Answers

Lets start with some data:

from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row

df1 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=125231, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])

df2 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])

There are a few ways you can approach this problem. First of all you can unambiguously reference child table columns using parent columns:

df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

You can also use table aliases:

from pyspark.sql.functions import col

df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")

df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

Finally you can programmatically rename columns:

df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))

df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)

## +--------------------+
## |               f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
like image 72
zero323 Avatar answered Oct 24 '22 07:10

zero323


I would recommend that you change the column names for your join.

df1.select(col("a") as "df1_a", col("f") as "df1_f")
   .join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))

The resulting DataFrame will have schema

(df1_a, df1_f, df2_a, df2_f)
like image 31
Glennie Helles Sindholt Avatar answered Oct 24 '22 05:10

Glennie Helles Sindholt


There is a simpler way than writing aliases for all of the columns you are joining on by doing:

df1.join(df2,['a'])

This works if the key that you are joining on is the same in both tables.

See https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html

like image 20
Paul Bendevis Avatar answered Oct 24 '22 05:10

Paul Bendevis


You can use def drop(col: Column) method to drop the duplicated column,for example:

DataFrame:df1

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

DataFrame:df2

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

when I join df1 with df2, the DataFrame will be like below:

val newDf = df1.join(df2,df1("a")===df2("a"))

DataFrame:newDf

+-------+-----+-------+-----+
| a     | f   | a     | f   |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+

Now, we can use def drop(col: Column) method to drop the duplicated column 'a' or 'f', just like as follows:

val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
like image 27
StrongYoung Avatar answered Oct 24 '22 07:10

StrongYoung


This is how we can join two Dataframes on same column names in PySpark.

df = df1.join(df2, ['col1','col2','col3'])

If you do printSchema() after this then you can see that duplicate columns have been removed.

like image 34
Nikhil Redij Avatar answered Oct 24 '22 05:10

Nikhil Redij


Suppose the DataFrames you want to join are df1 and df2, and you are joining them on column 'a', then you have 2 methods

Method 1

df1.join(df2,'a','left_outer')

This is an awsome method and it is highly recommended.

Method 2

df1.join(df2,df1.a == df2.a,'left_outer').drop(df2.a)

like image 26
typhoonbxq Avatar answered Oct 24 '22 07:10

typhoonbxq


After digging into the Spark API, I found I can first use alias to create an alias for the original dataframe, then I use withColumnRenamed to manually rename every column on the alias, this will do the join without causing the column name duplication.

More detail can be refer to below Spark Dataframe API:

pyspark.sql.DataFrame.alias

pyspark.sql.DataFrame.withColumnRenamed

However, I think this is only a troublesome workaround, and wondering if there is any better way for my question.

like image 27
resec Avatar answered Oct 24 '22 05:10

resec


This might not be the best approach, but if you want to rename the duplicate columns(after join), you can do so using this tiny function.

def rename_duplicate_columns(dataframe):
    columns = dataframe.columns
    duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
    for index in duplicate_column_indices:
        columns[index] = columns[index]+'2'
    dataframe = dataframe.toDF(*columns)
    return dataframe
like image 22
Akash Avatar answered Oct 24 '22 05:10

Akash


if only the key column is the same in both tables then try using the following way (Approach 1):

left. join(right , 'key', 'inner')

rather than below(approach 2):

left. join(right , left.key == right.key, 'inner')

Pros of using approach 1:

  • the 'key' will show only once in the final dataframe
  • easy to use the syntax

Cons of using approach 1:

  • only help with the key column
  • Scenarios, wherein case of left join, if planning to use the right key null count, this will not work. In that case, one has to rename one of the key as mentioned above.
like image 37
Manish Singla Avatar answered Oct 24 '22 05:10

Manish Singla


If you have a more complicated use case than described in the answer of Glennie Helles Sindholt e.g. you have other/few non-join column names that are also same and want to distinguish them while selecting it's best to use aliasses, e.g:

df3 = df1.select("a", "b").alias("left")\
   .join(df2.select("a", "b").alias("right"), ["a"])\
   .select("left.a", "left.b", "right.b")

df3.columns
['a', 'b', 'b']
like image 27
Wassermann Avatar answered Oct 24 '22 06:10

Wassermann