Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple Aggregate operations on the same column of a spark dataframe

I have three Arrays of string type containing following information:

  • groupBy array: containing names of the columns I want to group my data by.
  • aggregate array: containing names of columns I want to aggregate.
  • operations array: containing the aggregate operations I want to perform

I am trying to use spark data frames to achieve this. Spark data frames provide an agg() where you can pass a Map [String,String] (of column name and respective aggregate operation ) as input, however I want to perform different aggregation operations on the same column of the data. Any suggestions on how to achieve this?

like image 889
Richa Banker Avatar asked Jan 22 '16 19:01

Richa Banker


People also ask

How do you do multiple aggregate functions in PySpark?

Data frame in use: In PySpark, groupBy() is used to collect the identical data into groups on the PySpark DataFrame and perform aggregate functions on the grouped data. So by this we can do multiple aggregations at a time. where, column_name_group is the column to be grouped.

What is .AGG in spark?

agg. (Java-specific) Compute aggregates by specifying a map from column name to aggregate methods. The resulting DataFrame will also contain the grouping columns. The available aggregate methods are avg , max , min , sum , count .


1 Answers

Scala:

You can for example map over a list of functions with a defined mapping from name to function:

import org.apache.spark.sql.functions.{col, min, max, mean} import org.apache.spark.sql.Column  val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v") val mapping: Map[String, Column => Column] = Map(   "min" -> min, "max" -> max, "mean" -> avg)  val groupBy = Seq("k") val aggregate = Seq("v") val operations = Seq("min", "max", "mean") val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c))))  df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show // +---+------+------+------+ // |  k|min(v)|max(v)|avg(v)| // +---+------+------+------+ // |  1|   3.0|   3.0|   3.0| // |  2|  -5.0|  -5.0|  -5.0| // +---+------+------+------+ 

or

df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show 

Unfortunately parser which is used internally SQLContext is not exposed publicly but you can always try to build plain SQL queries:

df.registerTempTable("df") val groupExprs = groupBy.mkString(",") val aggExprs = aggregate.flatMap(c => operations.map(   f => s"$f($c) AS ${c}_${f}") ).mkString(",")  sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs") 

Python:

from pyspark.sql.functions import mean, sum, max, col  df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"]) groupBy = ["k"] aggregate = ["v"]  funs = [mean, sum, max]  exprs = [f(col(c)) for f in funs for c in aggregate]  # or equivalent df.groupby(groupBy).agg(*exprs) df.groupby(*groupBy).agg(*exprs) 

See also:

  • Spark SQL: apply aggregate functions to a list of column
like image 120
zero323 Avatar answered Oct 06 '22 11:10

zero323