Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Calculate the standard deviation of grouped data in a Spark DataFrame

Tags:

I have user logs that I have taken from a csv and converted into a DataFrame in order to leverage the SparkSQL querying features. A single user will create numerous entries per hour, and I would like to gather some basic statistical information for each user; really just the count of the user instances, the average, and the standard deviation of numerous columns. I was able to quickly get the mean and count information by using groupBy($"user") and the aggregator with SparkSQL functions for count and avg:

val meanData = selectedData.groupBy($"user").agg(count($"logOn"), avg($"transaction"), avg($"submit"), avg($"submitsPerHour"), avg($"replies"), avg($"repliesPerHour"), avg($"duration")) 

However, I cannot seem to find an equally elegant way to calculate the standard deviation. So far I can only calculate it by mapping a string, double pair and use StatCounter().stdev utility:

val stdevduration = duration.groupByKey().mapValues(value => org.apache.spark.util.StatCounter(value).stdev) 

This returns an RDD however, and I would like to try and keep it all in a DataFrame for further queries to be possible on the returned data.

like image 988
the3rdNotch Avatar asked Aug 03 '15 14:08

the3rdNotch


People also ask

What is AGG function in spark?

agg is a DataFrame method that accepts those aggregate functions as arguments: scala> my_df.agg(min("column")) res0: org.apache.spark.sql. DataFrame = [min(column): double]

How do you find variance in PySpark?

How to get variance for a Pyspark dataframe column? Pass the column name as a parameter to the variance() function. You can similarly use the variance_samp() function to get the sample variance and the variance_pop() function to get the population variance. Both the functions are available in the same pyspark.

How do you find the mean of a DataFrame in PySpark?

By using the mean () method, we can get the average value from the column, and finally, we can use the collect() method to get the average from the column. Where, df is the input PySpark DataFrame. column_name is the column to get the average value.


1 Answers

Spark 1.6+

You can use stddev_pop to compute population standard deviation and stddev / stddev_samp to compute unbiased sample standard deviation:

import org.apache.spark.sql.functions.{stddev_samp, stddev_pop}  selectedData.groupBy($"user").agg(stdev_pop($"duration")) 

Spark 1.5 and below (The original answer):

Not so pretty and biased (same as the value returned from describe) but using formula:

wikipedia sdev

you can do something like this:

import org.apache.spark.sql.functions.sqrt  selectedData     .groupBy($"user")     .agg((sqrt(         avg($"duration" * $"duration") -         avg($"duration") * avg($"duration")      )).alias("duration_sd")) 

You can of course create a function to reduce the clutter:

import org.apache.spark.sql.Column def mySd(col: Column): Column = {     sqrt(avg(col * col) - avg(col) * avg(col)) }  df.groupBy($"user").agg(mySd($"duration").alias("duration_sd")) 

It is also possible to use Hive UDF:

df.registerTempTable("df") sqlContext.sql("""SELECT user, stddev(duration)                   FROM df                   GROUP BY user""") 

Source of the image: https://en.wikipedia.org/wiki/Standard_deviation

like image 170
zero323 Avatar answered Sep 19 '22 14:09

zero323