Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark: custom function in aggregation on grouped data

I have a PySpark DataFrame containing things as

Row(id='id1', type='A', status='H', keywords=['k1', 'k2', 'k3'])

Status is a binary option ('S'/'H'). what I need to do is counting the ratio of occurrences in status S per each keyword per type, id and status. Ratio will be

s/(s+h)

where s and h here are the occurrences. So for instance, if keyword k1 occurs 2 times as S and 3 times as H in type A I'll want 2/3 for it in that type and my final output would ideally be

Row(id='id1', type='A', keyword='k1', ratio=0.66)

I was thinking this has to pass through several steps, and I'd be happy with computing the occurrences in S and H and then creating further column to ratio the two.

But how would I compute the said occurrences after I run a groupBy by 'id', 'type' and 'status'? Would there be a way to run an agg with a custom function?

like image 245
mar tin Avatar asked Mar 13 '23 22:03

mar tin


1 Answers

Something like this should do the trick:

from pyspark.sql.functions import explode, avg, col

ratio = avg(
    # If status "S" then 1.0 else 0.0
    (col("status") == "S").cast("double")
 ).alias("ratio")

(df
    .withColumn("keyword", explode("keywords"))
    .groupBy("id", "type", "keyword")
    .agg(ratio))
like image 129
zero323 Avatar answered Mar 16 '23 07:03

zero323