Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a z-score in Spark SQL for each group

I have a dataframe which looks like this

        dSc     TranAmount
 1: 100021      79.64
 2: 100021      79.64
 3: 100021       0.16
 4: 100022      11.65
 5: 100022       0.36
 6: 100022       0.47
 7: 100025       0.17
 8: 100037       0.27
 9: 100056       0.27
10: 100063       0.13
11: 100079       0.13
12: 100091       0.15
13: 100101       0.22
14: 100108       0.14
15: 100109       0.04

Now I want to create a third column with the z-score of each TranAmount which will be

(TranAmount-mean(TranAmount))/StdDev(TranAmount)

here mean and standard deviation will be based on groups of each dSc

Now I can calculate mean and standard deviation in Spark SQL.

(datafromdb
  .groupBy("dSc")
  .agg(datafromdb.dSc, func.avg("TranAmount") ,func.stddev_pop("TranAmount")))

but I am at a loss on how to achieve a third column with the z-score in the data frame. I would appreciate any pointer to the right way of achieving this/

like image 913
Bg1850 Avatar asked Apr 23 '16 07:04

Bg1850


People also ask

How do you implement z scores?

How do you calculate the z-score? The formula for calculating a z-score is is z = (x-μ)/σ, where x is the raw score, μ is the population mean, and σ is the population standard deviation. As the formula shows, the z-score is simply the raw score minus the population mean, divided by the population standard deviation.

How does groupBy work in Spark?

The groupBy method is defined in the Dataset class. groupBy returns a RelationalGroupedDataset object where the agg() method is defined. Spark makes great use of object oriented programming! The RelationalGroupedDataset class also defines a sum() method that can be used to get the same result with less code.


1 Answers

You can for example compute statistics and join with the original data:

stats = (df.groupBy("dsc")
  .agg(
      func.stddev_pop("TranAmount").alias("sd"), 
      func.avg("TranAmount").alias("avg")))

df.join(broadcast(stats), ["dsc"])

(df
    .join(func.broadcast(stats), ["dsc"])
    .select("dsc", "TranAmount", (df.TranAmount - stats.avg) / stats.sd))

or use window functions with standard deviation formula:

from pyspark.sql.window import Window
import sys

def z_score_w(col, w):
    avg_ = func.avg(col).over(w)
    avg_sq = func.avg(col * col).over(w)
    sd_ = func.sqrt(avg_sq - avg_ * avg_)
    return (col - avg_) / sd_

w = Window().partitionBy("dsc").rowsBetween(-sys.maxsize, sys.maxsize)
df.withColumn("zscore", z_score_w(df.TranAmount, w))
like image 157
zero323 Avatar answered Oct 26 '22 06:10

zero323