Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark divide column by its sum [duplicate]

I am trying to divide columns in PySpark by their respective sums. My dataframe(using only one column here) looks like this:

event_rates = [[1,10.461016949152542], [2, 10.38953488372093], [3, 10.609418282548477]]
event_rates = spark.createDataFrame(event_rates, ['cluster_id','mean_encoded'])
event_rates.show()

+----------+------------------+
|cluster_id|      mean_encoded|
+----------+------------------+
|         1|10.461016949152542|
|         2| 10.38953488372093|
|         3|10.609418282548477|
+----------+------------------+

I tried two methods to do this but have failed in getting results

from pyspark.sql.functions import sum as spark_sum
cols = event_rates.columns[1:]
for each in cols:
    event_rates = event_rates.withColumn(each+"_scaled", event_rates[each]/spark_sum(event_rates[each]))

This gives me the following error

org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and '`cluster_id`' is not an aggregate function. Wrap '((`mean_encoded` / sum(`mean_encoded`)) AS `mean_encoded_scaled`)' in windowing function(s) or wrap '`cluster_id`' in first() (or first_value) if you don't care which value you get.;;
Aggregate [cluster_id#22356L, mean_encoded#22357, (mean_encoded#22357 / sum(mean_encoded#22357)) AS mean_encoded_scaled#2

and following the question here I tried the following

stats = (event_rates.agg([spark_sum(x).alias(x + '_sum') for x in cols]))
event_rates = event_rates.join(broadcast(stats))
exprs = [event_rates[x] / event_rates[event_rates + '_sum'] for x in cols]
event_rates.select(exprs)

But I get an error from the first line stating

AssertionError: all exprs should be Column

How do I get across this?

like image 315
Clock Slave Avatar asked Oct 17 '25 15:10

Clock Slave


1 Answers

This is an example on how to divide column mean_encoded by its sum. You need to sum the column first then crossJoin back to the previous dataframe. Then, you can divide any column by its sum.

import pyspark.sql.functions as fn
from pyspark.sql.types import *

event_rates = event_rates.crossJoin(event_rates.groupby().agg(fn.sum('mean_encoded').alias('sum_mean_encoded')))
event_rates_div = event_rates.select('cluster_id', 
                                     'mean_encoded', 
                                     fn.col('mean_encoded') / fn.col('sum_mean_encoded'))

Output

+----------+------------------+---------------------------------+
|cluster_id|      mean_encoded|(mean_encoded / sum_mean_encoded)|
+----------+------------------+---------------------------------+
|         1|10.461016949152542|               0.3325183371367686|
|         2| 10.38953488372093|               0.3302461777809474|
|         3|10.609418282548477|               0.3372354850822839|
+----------+------------------+---------------------------------+
like image 195
titipata Avatar answered Oct 20 '25 17:10

titipata



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!