Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

aggregate function Count usage with groupBy in Spark

I'm trying to make multiple operations in one line of code in pySpark, and not sure if that's possible for my case.

My intention is not having to save the output as a new dataframe.

My current code is rather simple:

encodeUDF = udf(encode_time, StringType()) new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))   .groupBy('timePeriod')   .agg(     mean('DOWNSTREAM_SIZE').alias("Mean"),     stddev('DOWNSTREAM_SIZE').alias("Stddev")   )   .show(20, False) 

And my intention is to add count() after using groupBy, to get, well, the count of records matching each value of timePeriod column, printed\shown as output.

When trying to use groupBy(..).count().agg(..) I get exceptions.

Is there any way to achieve both count() and agg().show() prints, without splitting code to two lines of commands, e.g. :

new_log_df.withColumn(..).groupBy(..).count() new_log_df.withColumn(..).groupBy(..).agg(..).show() 

Or better yet, for getting a merged output to agg.show() output - An extra column which states the counted number of records matching the row's value. e.g.:

timePeriod | Mean | Stddev | Num Of Records     X      | 10   |   20   |    315 
like image 925
Adiel Avatar asked Jan 27 '17 09:01

Adiel


People also ask

How do you use groupBy and count in PySpark?

When we perform groupBy() on PySpark Dataframe, it returns GroupedData object which contains below aggregate functions. count() – Use groupBy() count() to return the number of rows for each group. mean() – Returns the mean of values for each group. max() – Returns the maximum of values for each group.

How do you use the count function in Spark?

Method -1 : Using select() method If we want to return the count from multiple columns, we have to use the count () method inside the select() method by specifying the column name separated by a comma. Where, df is the input PySpark DataFrame. column_name is the column to get the total number of rows (count).

How do I use count in Spark DataFrame?

For counting the number of distinct rows we are using distinct(). count() function which extracts the number of distinct rows from the Dataframe and storing it in the variable named as 'row' For counting the number of columns we are using df.

How does groupBy work in Spark?

The groupBy method is defined in the Dataset class. groupBy returns a RelationalGroupedDataset object where the agg() method is defined. Spark makes great use of object oriented programming! The RelationalGroupedDataset class also defines a sum() method that can be used to get the same result with less code.


1 Answers

count() can be used inside agg() as groupBy expression is same.

With Python

import pyspark.sql.functions as func  new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"]))    .groupBy("timePeriod")   .agg(      func.mean("DOWNSTREAM_SIZE").alias("Mean"),       func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),      func.count(func.lit(1)).alias("Num Of Records")    )   .show(20, False) 

pySpark SQL functions doc

With Scala

import org.apache.spark.sql.functions._ //for count()  new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME")))    .groupBy("timePeriod")   .agg(      mean("DOWNSTREAM_SIZE").alias("Mean"),       stddev("DOWNSTREAM_SIZE").alias("Stddev"),      count(lit(1)).alias("Num Of Records")    )   .show(20, false) 

count(1) will count the records by first column which is equal to count("timePeriod")

With Java

import static org.apache.spark.sql.functions.*;  new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME")))    .groupBy("timePeriod")   .agg(      mean("DOWNSTREAM_SIZE").alias("Mean"),       stddev("DOWNSTREAM_SIZE").alias("Stddev"),      count(lit(1)).alias("Num Of Records")    )   .show(20, false) 
like image 76
mrsrinivas Avatar answered Sep 26 '22 02:09

mrsrinivas