I have PySpark dataframe (source_df) in which there is a column with values that are comma-separated.
I am trying to replace those values with a lookup based on another dataframe (lookup_df)
source_df
A B T ... followed by N unrelated columns...
foo a,b,c sam
bar k,a,c bob
faz b,a,f sam
lookup_df
C D
a h1
b h2
c h3
output dataframe:
A T B new_col ... followed by N unrelated columns...
foo sam a,b,c h1,h2,h3
bar bob h,a,c EMPTY,h1,h3
faz sam b,a,f h2,h1,EMPTY
Column A is a primary key and is always unique. Column T is unique for a given value of A.
You can split and explode the column B and do a left join. Then collect the D values and concat with comma.
import pyspark.sql.functions as F
result = source_df.withColumn(
'B_split',
F.explode(F.split('B', ','))
).alias('s').join(
lookup_df.alias('l'),
F.expr('s.B_split = l.C'),
'left'
).drop('C').na.fill(
'EMPTY', ['D']
).groupBy(
source_df.columns
).agg(
F.concat_ws(',', F.collect_list('D')).alias('new_col')
)
result.show()
+---+-----+---+-----------+
| A| B| T| new_col|
+---+-----+---+-----------+
|foo|a,b,c|sam| h1,h2,h3|
|faz|b,a,f|sam|h2,h1,EMPTY|
|bar|k,a,c|bob|EMPTY,h1,h3|
+---+-----+---+-----------+
Here's another way without having to explode the first dataframe. Join directly using like: ',B,' LIKE '%,C,%'. Then groupby A to collect the mappings into a map column and using transform on the split of column B get the corresponding replacement for each element.
Finally, using array_join function you can get a comma-separated list from the array result of transform and null values replaced by EMPTY:
from pyspark.sql import functions as F
df1 = df.join(
lookup_df,
F.expr("concat(',', B, ',') like concat('%,', C, ',%')"),
"left"
).groupby("A").agg(
*[F.first(c).alias(c) for c in df.columns if c != "A"],
F.map_from_entries(
F.collect_list(
F.struct(F.col("C"), F.col("D"))
)
).alias("mappings")
).select(
F.col("A"),
F.col("B"),
F.expr("array_join(transform(split(B, ','), x -> mappings[x]), ',', 'EMPTY')").alias("new_col"),
*[F.col(c) for c in df.columns if c not in ("A", "B")]
)
df1.show()
#+---+-----+-----------+---+
#| A| B| new_col| T|
#+---+-----+-----------+---+
#|faz|b,a,f|h2,h1,EMPTY|sam|
#|bar|k,a,c|EMPTY,h1,h3|bob|
#|foo|a,b,c| h1,h2,h3|sam|
#+---+-----+-----------+---+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With