Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark multiple dynamic aggregate functions, countDistinct not working

Aggregation on Spark dataframe with multiple dynamic aggregation operations.

I want to do aggregation on a Spark dataframe using Scala with multiple dynamic aggregation operations (passed by user in JSON). I'm converting the JSON to a Map.

Below is some sample data:

colA    colB    colC    colD
1       2       3       4
5       6       7       8
9       10      11      12

The Spark aggregation code which I am using:

var cols = ["colA","colB"]
var aggFuncMap = Map("colC"-> "sum", "colD"-> "countDistinct")
var aggregatedDF = currentDF.groupBy(cols.head, cols.tail: _*).agg(aggFuncMap)

I have to pass aggFuncMap as Map only, so that user can pass any number of aggregations through the JSON configuration.

The above code is working fine for some aggregations, including sum, min, max, avg and count.

However, unfortunately this code is not working for countDistinct (maybe because it is camel case?).

When running the above code, I am getting this error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Undefined function: 'countdistinct'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'

Any help will be appreciated!

like image 947
Tarun Khaneja Avatar asked Oct 29 '25 07:10

Tarun Khaneja


1 Answers

It's currently not possible to use agg with countDistinct inside a Map. From the documentation we see:

The available aggregate methods are avg, max, min, sum, count.


A possible fix would be to change the Map to a Seq[Column],

val cols = Seq("colA", "colB")
val aggFuncs = Seq(sum("colC"), countDistinct("colD"))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

but that won't help very much if the user are to specify the aggregations in a configuration file.

Another approach would be to use expr, this function will evaluate a string and give back a column. However, expr won't accept "countDistinct", instead "count(distinct(...))" needs to be used. This could be coded as follows:

val aggFuncs = Seq("sum(colC)", "count(distinct(colD))").map(e => expr(e))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)
like image 124
Shaido Avatar answered Oct 31 '25 12:10

Shaido