I have user logs that I have taken from a csv and converted into a DataFrame in order to leverage the SparkSQL querying features. A single user will create numerous entries per hour, and I would like to gather some basic statistical information for each user; really just the count of the user instances, the average, and the standard deviation of numerous columns. I was able to quickly get the mean and count information by using groupBy($"user") and the aggregator with SparkSQL functions for count and avg:
val meanData = selectedData.groupBy($"user").agg(count($"logOn"), avg($"transaction"), avg($"submit"), avg($"submitsPerHour"), avg($"replies"), avg($"repliesPerHour"), avg($"duration"))
However, I cannot seem to find an equally elegant way to calculate the standard deviation. So far I can only calculate it by mapping a string, double pair and use StatCounter().stdev utility:
val stdevduration = duration.groupByKey().mapValues(value => org.apache.spark.util.StatCounter(value).stdev)
This returns an RDD however, and I would like to try and keep it all in a DataFrame for further queries to be possible on the returned data.
agg is a DataFrame method that accepts those aggregate functions as arguments: scala> my_df.agg(min("column")) res0: org.apache.spark.sql. DataFrame = [min(column): double]
How to get variance for a Pyspark dataframe column? Pass the column name as a parameter to the variance() function. You can similarly use the variance_samp() function to get the sample variance and the variance_pop() function to get the population variance. Both the functions are available in the same pyspark.
By using the mean () method, we can get the average value from the column, and finally, we can use the collect() method to get the average from the column. Where, df is the input PySpark DataFrame. column_name is the column to get the average value.
Spark 1.6+
You can use stddev_pop
to compute population standard deviation and stddev
/ stddev_samp
to compute unbiased sample standard deviation:
import org.apache.spark.sql.functions.{stddev_samp, stddev_pop} selectedData.groupBy($"user").agg(stdev_pop($"duration"))
Spark 1.5 and below (The original answer):
Not so pretty and biased (same as the value returned from describe
) but using formula:
you can do something like this:
import org.apache.spark.sql.functions.sqrt selectedData .groupBy($"user") .agg((sqrt( avg($"duration" * $"duration") - avg($"duration") * avg($"duration") )).alias("duration_sd"))
You can of course create a function to reduce the clutter:
import org.apache.spark.sql.Column def mySd(col: Column): Column = { sqrt(avg(col * col) - avg(col) * avg(col)) } df.groupBy($"user").agg(mySd($"duration").alias("duration_sd"))
It is also possible to use Hive UDF:
df.registerTempTable("df") sqlContext.sql("""SELECT user, stddev(duration) FROM df GROUP BY user""")
Source of the image: https://en.wikipedia.org/wiki/Standard_deviation
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With