Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pySpark withColumn with a function

I have a dataframe which has 2 columns: account_id and email_address, now I want to add one more column updated_email_address which i call some function on email_address to get the updated_email_address. here is my code:

def update_email(email):
  print("== email to be updated: " + email)
  today = datetime.date.today()
  updated = substring(email, -8, 8) + str(today.strftime('%m')) + str(today.strftime('%d')) + "_updated"
  return updated

df.withColumn('updated_email_address', update_email(df.email_address))

but the result showed updated_email_address column as null:

+---------------+--------------+---------------------+
|account_id     |email_address |updated_email_address|
+---------------+--------------+---------------------+
|123456gd7tuhha |[email protected]  |null           |
|djasevneuagsj1 |[email protected]  |null           |
+---------------+--------------+---------------+

inside the function updated_email it printed out:

Column<b'(email_address + == email to be udpated: )'>

also it showed the df's column data type as:

dfData:pyspark.sql.dataframe.DataFrame
account_id:string
email_address:string
updated_email_address:double

why is updated_email_address column type of double?

like image 706
user468587 Avatar asked Sep 16 '25 07:09

user468587


1 Answers

You're calling a Python function with Column type. You have to create udf from update_email and then use it:

update_email_udf = udf(update_email)

However, I'd suggest you to not use UDF fot such transformation, you could do it using only Spark built-in functions (UDFs are known for bad performance) :

df.withColumn('updated_email_address',
              concat(substring(col("email_address"), -8, 8), date_format(current_date(), "ddMM"), lit("_updated"))
             ).show()

You can find here all Spark SQL built-in functions.

like image 103
blackbishop Avatar answered Sep 19 '25 06:09

blackbishop