I have a dataframe which looks like this
dSc TranAmount
1: 100021 79.64
2: 100021 79.64
3: 100021 0.16
4: 100022 11.65
5: 100022 0.36
6: 100022 0.47
7: 100025 0.17
8: 100037 0.27
9: 100056 0.27
10: 100063 0.13
11: 100079 0.13
12: 100091 0.15
13: 100101 0.22
14: 100108 0.14
15: 100109 0.04
Now I want to create a third column with the z-score of each TranAmount
which will be
(TranAmount-mean(TranAmount))/StdDev(TranAmount)
here mean and standard deviation will be based on groups of each dSc
Now I can calculate mean and standard deviation in Spark SQL.
(datafromdb
.groupBy("dSc")
.agg(datafromdb.dSc, func.avg("TranAmount") ,func.stddev_pop("TranAmount")))
but I am at a loss on how to achieve a third column with the z-score in the data frame. I would appreciate any pointer to the right way of achieving this/
How do you calculate the z-score? The formula for calculating a z-score is is z = (x-μ)/σ, where x is the raw score, μ is the population mean, and σ is the population standard deviation. As the formula shows, the z-score is simply the raw score minus the population mean, divided by the population standard deviation.
The groupBy method is defined in the Dataset class. groupBy returns a RelationalGroupedDataset object where the agg() method is defined. Spark makes great use of object oriented programming! The RelationalGroupedDataset class also defines a sum() method that can be used to get the same result with less code.
You can for example compute statistics and join
with the original data:
stats = (df.groupBy("dsc")
.agg(
func.stddev_pop("TranAmount").alias("sd"),
func.avg("TranAmount").alias("avg")))
df.join(broadcast(stats), ["dsc"])
(df
.join(func.broadcast(stats), ["dsc"])
.select("dsc", "TranAmount", (df.TranAmount - stats.avg) / stats.sd))
or use window functions with standard deviation formula:
from pyspark.sql.window import Window
import sys
def z_score_w(col, w):
avg_ = func.avg(col).over(w)
avg_sq = func.avg(col * col).over(w)
sd_ = func.sqrt(avg_sq - avg_ * avg_)
return (col - avg_) / sd_
w = Window().partitionBy("dsc").rowsBetween(-sys.maxsize, sys.maxsize)
df.withColumn("zscore", z_score_w(df.TranAmount, w))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With