Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I rename a PySpark dataframe column by index? (handle duplicated column names)

I have an issue where I need to dynamically update columns in a Spark dataframe.

Basically I need to loop through the column list and if the column exists already in the list, rename it to that column plus its index.

My attempted code was something like this:

def dup_cols(df):
  for i, icol in enumerate(df.columns):
    for x, xcol in enumerate(df.columns):
      if icol == xcol and i != x:
        df = df.withColumnsRenamed(xcol, xcol + '_' + str(x))
  return df

But this renames by name (here as xcol), thus not solving my issue.

Can I change this to rename the column in the dataframe by its index? I have searched around for quite a while and found nothing.

I also cannot convert to a Pandas dataframe, so I would need a Spark/PySpark solution to renaming a specific column by its index only.

Thank you!

like image 385
Turner Avatar asked Dec 13 '18 17:12

Turner


1 Answers

You can use pyspark.sql.DataFrame.toDF() to rename the columns:

Returns a new class:DataFrame that with new specified column names

Here is an example:

data = [
    (1, 2, 3),
    (4, 5, 6),
    (7, 8, 9)
]

df = spark.createDataFrame(data, ["a", "b", "a"])
df.printSchema()
#root
# |-- a: long (nullable = true)
# |-- b: long (nullable = true)
# |-- a: long (nullable = true)

Create new names based on your index logic:

new_names = []
counter = {c: -1 for c in df.columns}
for c in df.columns:
    new_c = c
    counter[c] += 1
    new_c += str(counter[c]) if counter[c] else ""
    new_names.append(new_c)
print(new_names)
#['a', 'b', 'a1']

Now use toDF() to create a new DataFrame with the new column names:

df = df.toDF(*new_names)
df.printSchema()
#root
# |-- a: long (nullable = true)
# |-- b: long (nullable = true)
# |-- a1: long (nullable = true)
like image 182
pault Avatar answered Nov 20 '22 18:11

pault