I have a dataframe which has multiple columns. I want to group by one of the columns and aggregate other columns all the once. Let's say the table have 4 columns, cust_id, f1,f2,f3 and I want to group by cust_id and then get avg(f1), avg(f2) and avg(f3).The table will have many columns. Any hints?
The following code is good start but as I have many columns it may not be good idea to manually write them.
df.groupBy("cust_id").agg(sum("f1"), sum("f2"), sum("f3"))
Apache Spark / Spark SQL Functions Spark SQL provides built-in standard Aggregate functions defines in DataFrame API, these come in handy when we need to make aggregate operations on DataFrame columns. Aggregate functions operate on a group of rows and calculate a single return value for every group.
In this article, we will discuss how to perform aggregation on multiple columns in Pyspark using Python. We can do this by using Groupby () function In PySpark, groupBy () is used to collect the identical data into groups on the PySpark DataFrame and perform aggregate functions on the grouped data
Note: When we do multiple aggregations on a single column (when there is a list of aggregation operations), the resultant data frame column names will have multiple levels. To access them easily, we must flatten the levels – which we will see at the end of this note. For now, let’s proceed to the next level of aggregation.
We pass in the aggregation function names as a list of strings into the DataFrameGroupBy.agg () function as shown below. Note: When we do multiple aggregations on a single column (when there is a list of aggregation operations), the resultant data frame column names will have multiple levels.
Maybe you can try mapping a list with the colum names:
val groupCol = "cust_id"
val aggCols = (df.columns.toSet - groupCol).map(
colName => avg(colName).as(colName + "_avg")
).toList
df.groupBy(groupCol).agg(aggCols.head, aggCols.tail: _*)
Alternatively, if needed, you can also match the schema and build the aggregations based on the type:
val aggCols = df.schema.collect {
case StructField(colName, IntegerType, _, _) => avg(colName).as(colName + "_avg")
case StructField(colName, StringType, _, _) => first(colName).as(colName + "_first")
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With