Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create a new column by replacing comma-separated column's values with a lookup based on another dataframe

I have PySpark dataframe (source_df) in which there is a column with values that are comma-separated. I am trying to replace those values with a lookup based on another dataframe (lookup_df)

source_df

A      B      T   ... followed by N unrelated columns...
foo    a,b,c  sam
bar    k,a,c  bob
faz    b,a,f  sam

lookup_df

C D
a h1
b h2
c h3

output dataframe:

A   T     B      new_col     ... followed by N unrelated columns...
foo sam   a,b,c  h1,h2,h3
bar bob   h,a,c  EMPTY,h1,h3
faz sam   b,a,f  h2,h1,EMPTY

Column A is a primary key and is always unique. Column T is unique for a given value of A.

like image 980
jamesU Avatar asked Nov 28 '25 03:11

jamesU


2 Answers

You can split and explode the column B and do a left join. Then collect the D values and concat with comma.

import pyspark.sql.functions as F

result = source_df.withColumn(
    'B_split',
    F.explode(F.split('B', ','))
).alias('s').join(
    lookup_df.alias('l'),
    F.expr('s.B_split = l.C'),
    'left'
).drop('C').na.fill(
    'EMPTY', ['D']
).groupBy(
    source_df.columns
).agg(
    F.concat_ws(',', F.collect_list('D')).alias('new_col')
)

result.show()
+---+-----+---+-----------+
|  A|    B|  T|    new_col|
+---+-----+---+-----------+
|foo|a,b,c|sam|   h1,h2,h3|
|faz|b,a,f|sam|h2,h1,EMPTY|
|bar|k,a,c|bob|EMPTY,h1,h3|
+---+-----+---+-----------+
like image 171
mck Avatar answered Nov 30 '25 18:11

mck


Here's another way without having to explode the first dataframe. Join directly using like: ',B,' LIKE '%,C,%'. Then groupby A to collect the mappings into a map column and using transform on the split of column B get the corresponding replacement for each element.

Finally, using array_join function you can get a comma-separated list from the array result of transform and null values replaced by EMPTY:

from pyspark.sql import functions as F

df1 = df.join(
    lookup_df,
    F.expr("concat(',', B, ',') like concat('%,', C, ',%')"),
    "left"
).groupby("A").agg(
    *[F.first(c).alias(c) for c in df.columns if c != "A"],
    F.map_from_entries(
        F.collect_list(
            F.struct(F.col("C"), F.col("D"))
        )
    ).alias("mappings")
).select(
    F.col("A"),
    F.col("B"),
    F.expr("array_join(transform(split(B, ','), x -> mappings[x]), ',', 'EMPTY')").alias("new_col"),
    *[F.col(c) for c in df.columns if c not in ("A", "B")]
)

df1.show()

#+---+-----+-----------+---+
#|  A|    B|    new_col|  T|
#+---+-----+-----------+---+
#|faz|b,a,f|h2,h1,EMPTY|sam|
#|bar|k,a,c|EMPTY,h1,h3|bob|
#|foo|a,b,c|   h1,h2,h3|sam|
#+---+-----+-----------+---+
like image 45
blackbishop Avatar answered Nov 30 '25 17:11

blackbishop



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!