Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Add a new column to a PySpark DataFrame from a Python list

I have a list:

dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]

The dataframe I try to add it to is the same length (no issues there).

I tried:

df = df.withColumn("YEARS", dates)
Error: Column needs to be col

I tried also:

df = df.withColumn("YEARS", f.lit(dates))

But that does not work as well.

I saw this question: How to add a constant column in a Spark DataFrame?

But nothing there is useful for this case.

UPDATE: What the expected result is:

df_columns...   | dates_from_list
---------------------------------
original_df_data| 2017
original_df_data| 2018
original_df_data| 2018
original_df_data| 2018
original_df_data| 2019
original_df_data| 2019
original_df_data| 2019
original_df_data| 2020
original_df_data| 2020
original_df_data| 2020
like image 916
Toby Djelyinski Avatar asked Mar 03 '23 04:03

Toby Djelyinski


1 Answers

Your error comes from the fact that you need to pass to withColumn a Column object.

Here are two ways to add your dates as a new column on a Spark DataFrame (join made using order of records in each), depending on the size of your dates data.

1) If you manipulate a small dataset

A concise way to achieve it is to apply a UDF to a monotically increasing id:

from pyspark.sql.functions import udf, monotonically_increasing_id

df = [...]  # 10 records

dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]

df = df.repartition(1).withColumn(
    "YEARS", 
    udf(lambda id: dates[id])(monotonically_increasing_id()))

df.show()

outputs:

+---+-----+
|...|YEARS|
+---+-----+
|...| 2017|
|...| 2018|
|...| 2018|
|...| 2018|
|...| 2019|
|...| 2019|
|...| 2019|
|...| 2020|
|...| 2020|
|...| 2020|
+---+-----+

Note: The .repartition(1) ensures that the generated ids are consecutive. This repartitioning to a single partition can be avoided if you have another way to map each record to a value in dates (like a previously built id column). In this use case, as we expect that the Python list object is quite small, it implies that your DataFrame is also quite small, so this repartitioning is not a big deal.

/!\ Why it will not scale if the dataframe and the python list are too big:

  • repartitionings of the dataframe are needed, leading to shuffles/exchanges that are expensive
  • the .repartition(1) may lead to the generation of a very massive partition that can be very slow to process (because it is huge and because if it does not fit in execution memory it may imply many additional disk I/O to spill RDD blocks to disk), or make the job crash with an OutOfMemoryError.
  • the python list is captured by the udf (by the lambda closure), meaning that it will be broadcasted to each executor of your cluster

2) If you manipulate a dataset with size > millions of rows

Here is another approach that allow to deal way better with millions of rows by manipulate ids and dates columns with pandas and avoid any repartitioning of the Spark DataFrame.

Can be done like this:

import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.getOrCreate()

# some spark DataFrame of length N
df = [...]  

# generate monotically increasing ids (not consecutive) without repartitioning the Spark DataFrame.
df = df.withColumn("id", monotonically_increasing_id())

# get generated ids (not consecutive) as a mono-column pandas DataFrame
spark_df_ids = df.select("id").toPandas()

# some python list of length N
dates = [2017, 2018, 2018, 2018, 2019, ..., 2019, 2019, 2020, 2020, 2020]

# build pandas DataFrame from dates
dates_pandas_df = pd.DataFrame(dates, columns=["YEARS"])

# append the id column to the dates in pandas
dates_and_ids_pandas_df = dates_pandas_df.join(spark_df_ids)

# convert from pandas DataFrame to spark DataFrame
dates_and_ids_spark_df = spark.createDataFrame(dates_and_ids_pandas_df)

# Perform the final adding of the dates column to the Spark DataFrame with a join in Spark
df.join(dates_and_ids_spark_df, ["id"]).show()

Important: The conversion from and to pandas can be made faster by using Apache Arrow

like image 196
bonnal-enzo Avatar answered Mar 06 '23 11:03

bonnal-enzo