Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Custom aggregations for Spark dataframes

I was wondering if there is some way to specify a custom aggregation function for Spark dataframes. If I have a table with 2 columns id and value I would like to groupBy id and aggregate the values into a list for each value like so:

from:

john | tomato
john | carrot
bill | apple
john | banana
bill | taco

to:

john | tomato, carrot, banana
bill | apple, taco

Is this possible in dataframes? I am asking about dataframes because I am reading data as an orc file and it is loaded as a dataframe. I would think it is in-efficient to convert it to a RDD.

like image 853
anthonybell Avatar asked May 31 '26 21:05

anthonybell


1 Answers

I'd just go simply with the following :

import org.apache.spark.sql.functions.collect_list
val df = Seq(("john", "tomato"), ("john", "carrot"), 
             ("bill", "apple"), ("john", "banana"), 
             ("bill", "taco")).toDF("id", "value")
// df: org.apache.spark.sql.DataFrame = [id: string, value: string]

val aggDf = df.groupBy($"id").agg(collect_list($"value").as("values"))
// aggDf: org.apache.spark.sql.DataFrame = [id: string, values: array<string>]

aggDf.show(false)
// +----+------------------------+
// |id  |values                  |
// +----+------------------------+
// |john|[tomato, carrot, banana]|
// |bill|[apple, taco]           |
// +----+------------------------+

You won't even need to call the underlying rdd.

like image 196
eliasah Avatar answered Jun 02 '26 12:06

eliasah