Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to do aggregation on multiple columns at once in Spark

I have a dataframe which has multiple columns. I want to group by one of the columns and aggregate other columns all the once. Let's say the table have 4 columns, cust_id, f1,f2,f3 and I want to group by cust_id and then get avg(f1), avg(f2) and avg(f3).The table will have many columns. Any hints?

The following code is good start but as I have many columns it may not be good idea to manually write them.

df.groupBy("cust_id").agg(sum("f1"), sum("f2"), sum("f3"))
like image 773
HHH Avatar asked Aug 12 '16 19:08

HHH


People also ask

How do you use aggregate functions in spark?

Apache Spark / Spark SQL Functions Spark SQL provides built-in standard Aggregate functions defines in DataFrame API, these come in handy when we need to make aggregate operations on DataFrame columns. Aggregate functions operate on a group of rows and calculate a single return value for every group.

How to perform aggregation on multiple columns in pyspark using Python?

In this article, we will discuss how to perform aggregation on multiple columns in Pyspark using Python. We can do this by using Groupby () function In PySpark, groupBy () is used to collect the identical data into groups on the PySpark DataFrame and perform aggregate functions on the grouped data

What happens when we do multiple aggregations on a single column?

Note: When we do multiple aggregations on a single column (when there is a list of aggregation operations), the resultant data frame column names will have multiple levels. To access them easily, we must flatten the levels – which we will see at the end of this note. For now, let’s proceed to the next level of aggregation.

How do you use aggregation in a Dataframe?

We pass in the aggregation function names as a list of strings into the DataFrameGroupBy.agg () function as shown below. Note: When we do multiple aggregations on a single column (when there is a list of aggregation operations), the resultant data frame column names will have multiple levels.


1 Answers

Maybe you can try mapping a list with the colum names:

val groupCol = "cust_id"
val aggCols = (df.columns.toSet - groupCol).map(
  colName => avg(colName).as(colName + "_avg")
).toList

df.groupBy(groupCol).agg(aggCols.head, aggCols.tail: _*)

Alternatively, if needed, you can also match the schema and build the aggregations based on the type:

val aggCols = df.schema.collect {
  case StructField(colName, IntegerType, _, _) => avg(colName).as(colName + "_avg")
  case StructField(colName, StringType, _, _) => first(colName).as(colName + "_first")
}
like image 165
Daniel de Paula Avatar answered Sep 28 '22 06:09

Daniel de Paula