I am trying to divide columns in PySpark by their respective sums. My dataframe(using only one column here) looks like this:
event_rates = [[1,10.461016949152542], [2, 10.38953488372093], [3, 10.609418282548477]]
event_rates = spark.createDataFrame(event_rates, ['cluster_id','mean_encoded'])
event_rates.show()
+----------+------------------+
|cluster_id| mean_encoded|
+----------+------------------+
| 1|10.461016949152542|
| 2| 10.38953488372093|
| 3|10.609418282548477|
+----------+------------------+
I tried two methods to do this but have failed in getting results
from pyspark.sql.functions import sum as spark_sum
cols = event_rates.columns[1:]
for each in cols:
event_rates = event_rates.withColumn(each+"_scaled", event_rates[each]/spark_sum(event_rates[each]))
This gives me the following error
org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and '`cluster_id`' is not an aggregate function. Wrap '((`mean_encoded` / sum(`mean_encoded`)) AS `mean_encoded_scaled`)' in windowing function(s) or wrap '`cluster_id`' in first() (or first_value) if you don't care which value you get.;;
Aggregate [cluster_id#22356L, mean_encoded#22357, (mean_encoded#22357 / sum(mean_encoded#22357)) AS mean_encoded_scaled#2
and following the question here I tried the following
stats = (event_rates.agg([spark_sum(x).alias(x + '_sum') for x in cols]))
event_rates = event_rates.join(broadcast(stats))
exprs = [event_rates[x] / event_rates[event_rates + '_sum'] for x in cols]
event_rates.select(exprs)
But I get an error from the first line stating
AssertionError: all exprs should be Column
How do I get across this?
This is an example on how to divide column mean_encoded
by its sum. You need to sum the column first then crossJoin
back to the previous dataframe. Then, you can divide any column by its sum.
import pyspark.sql.functions as fn
from pyspark.sql.types import *
event_rates = event_rates.crossJoin(event_rates.groupby().agg(fn.sum('mean_encoded').alias('sum_mean_encoded')))
event_rates_div = event_rates.select('cluster_id',
'mean_encoded',
fn.col('mean_encoded') / fn.col('sum_mean_encoded'))
Output
+----------+------------------+---------------------------------+
|cluster_id| mean_encoded|(mean_encoded / sum_mean_encoded)|
+----------+------------------+---------------------------------+
| 1|10.461016949152542| 0.3325183371367686|
| 2| 10.38953488372093| 0.3302461777809474|
| 3|10.609418282548477| 0.3372354850822839|
+----------+------------------+---------------------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With