I have three Arrays of string type containing following information:
I am trying to use spark data frames to achieve this. Spark data frames provide an agg() where you can pass a Map [String,String] (of column name and respective aggregate operation ) as input, however I want to perform different aggregation operations on the same column of the data. Any suggestions on how to achieve this?
Data frame in use: In PySpark, groupBy() is used to collect the identical data into groups on the PySpark DataFrame and perform aggregate functions on the grouped data. So by this we can do multiple aggregations at a time. where, column_name_group is the column to be grouped.
agg. (Java-specific) Compute aggregates by specifying a map from column name to aggregate methods. The resulting DataFrame will also contain the grouping columns. The available aggregate methods are avg , max , min , sum , count .
Scala:
You can for example map over a list of functions with a defined mapping
from name to function:
import org.apache.spark.sql.functions.{col, min, max, mean} import org.apache.spark.sql.Column val df = Seq((1L, 3.0), (1L, 3.0), (2L, -5.0)).toDF("k", "v") val mapping: Map[String, Column => Column] = Map( "min" -> min, "max" -> max, "mean" -> avg) val groupBy = Seq("k") val aggregate = Seq("v") val operations = Seq("min", "max", "mean") val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c)))) df.groupBy(groupBy.map(col): _*).agg(exprs.head, exprs.tail: _*).show // +---+------+------+------+ // | k|min(v)|max(v)|avg(v)| // +---+------+------+------+ // | 1| 3.0| 3.0| 3.0| // | 2| -5.0| -5.0| -5.0| // +---+------+------+------+
or
df.groupBy(groupBy.head, groupBy.tail: _*).agg(exprs.head, exprs.tail: _*).show
Unfortunately parser which is used internally SQLContext
is not exposed publicly but you can always try to build plain SQL queries:
df.registerTempTable("df") val groupExprs = groupBy.mkString(",") val aggExprs = aggregate.flatMap(c => operations.map( f => s"$f($c) AS ${c}_${f}") ).mkString(",") sqlContext.sql(s"SELECT $groupExprs, $aggExprs FROM df GROUP BY $groupExprs")
Python:
from pyspark.sql.functions import mean, sum, max, col df = sc.parallelize([(1, 3.0), (1, 3.0), (2, -5.0)]).toDF(["k", "v"]) groupBy = ["k"] aggregate = ["v"] funs = [mean, sum, max] exprs = [f(col(c)) for f in funs for c in aggregate] # or equivalent df.groupby(groupBy).agg(*exprs) df.groupby(*groupBy).agg(*exprs)
See also:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With