I have a list:
dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]
The dataframe I try to add it to is the same length (no issues there).
I tried:
df = df.withColumn("YEARS", dates)
Error: Column needs to be col
I tried also:
df = df.withColumn("YEARS", f.lit(dates))
But that does not work as well.
I saw this question: How to add a constant column in a Spark DataFrame?
But nothing there is useful for this case.
UPDATE: What the expected result is:
df_columns... | dates_from_list
---------------------------------
original_df_data| 2017
original_df_data| 2018
original_df_data| 2018
original_df_data| 2018
original_df_data| 2019
original_df_data| 2019
original_df_data| 2019
original_df_data| 2020
original_df_data| 2020
original_df_data| 2020
Your error comes from the fact that you need to pass to withColumn
a Column
object.
Here are two ways to add your dates as a new column on a Spark DataFrame
(join made using order of records in each), depending on the size of your dates data.
A concise way to achieve it is to apply a UDF to a monotically increasing id:
from pyspark.sql.functions import udf, monotonically_increasing_id
df = [...] # 10 records
dates = [2017, 2018, 2018, 2018, 2019, 2019, 2019, 2020, 2020, 2020]
df = df.repartition(1).withColumn(
"YEARS",
udf(lambda id: dates[id])(monotonically_increasing_id()))
df.show()
outputs:
+---+-----+
|...|YEARS|
+---+-----+
|...| 2017|
|...| 2018|
|...| 2018|
|...| 2018|
|...| 2019|
|...| 2019|
|...| 2019|
|...| 2020|
|...| 2020|
|...| 2020|
+---+-----+
Note: The .repartition(1)
ensures that the generated ids are consecutive. This repartitioning to a single partition can be avoided if you have another way to map each record to a value in dates
(like a previously built id column).
In this use case, as we expect that the Python list object is quite small, it implies that your DataFrame is also quite small, so this repartitioning is not a big deal.
/!\ Why it will not scale if the dataframe and the python list are too big:
.repartition(1)
may lead to the generation of a very massive partition that can be very slow to process (because it is huge and because if it does not fit in execution memory it may imply many additional disk I/O to spill RDD blocks to disk), or make the job crash with an OutOfMemoryError
.Here is another approach that allow to deal way better with millions of rows by manipulate ids and dates columns with pandas and avoid any repartitioning of the Spark DataFrame
.
Can be done like this:
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
# some spark DataFrame of length N
df = [...]
# generate monotically increasing ids (not consecutive) without repartitioning the Spark DataFrame.
df = df.withColumn("id", monotonically_increasing_id())
# get generated ids (not consecutive) as a mono-column pandas DataFrame
spark_df_ids = df.select("id").toPandas()
# some python list of length N
dates = [2017, 2018, 2018, 2018, 2019, ..., 2019, 2019, 2020, 2020, 2020]
# build pandas DataFrame from dates
dates_pandas_df = pd.DataFrame(dates, columns=["YEARS"])
# append the id column to the dates in pandas
dates_and_ids_pandas_df = dates_pandas_df.join(spark_df_ids)
# convert from pandas DataFrame to spark DataFrame
dates_and_ids_spark_df = spark.createDataFrame(dates_and_ids_pandas_df)
# Perform the final adding of the dates column to the Spark DataFrame with a join in Spark
df.join(dates_and_ids_spark_df, ["id"]).show()
Important: The conversion from and to pandas can be made faster by using Apache Arrow
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With