Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Aggregating multiple columns with custom function in Spark

I was wondering if there is some way to specify a custom aggregation function for spark dataframes over multiple columns.

I have a table like this of the type (name, item, price):

john | tomato | 1.99 john | carrot | 0.45 bill | apple  | 0.99 john | banana | 1.29 bill | taco   | 2.59 

to:

I would like to aggregate the item and it's cost for each person into a list like this:

john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29) bill | (apple, 0.99), (taco, 2.59) 

Is this possible in dataframes? I recently learned about collect_list but it appears to only work for one column.

like image 549
anthonybell Avatar asked Jun 09 '16 23:06

anthonybell


People also ask

How do you do multiple aggregate functions in Pyspark?

Data frame in use: In PySpark, groupBy() is used to collect the identical data into groups on the PySpark DataFrame and perform aggregate functions on the grouped data. So by this we can do multiple aggregations at a time. where, column_name_group is the column to be grouped.

How do I combine two columns in Spark?

Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.


2 Answers

Consider using the struct function to group the columns together before collecting as a list:

import org.apache.spark.sql.functions.{collect_list, struct} import sqlContext.implicits._  val df = Seq(   ("john", "tomato", 1.99),   ("john", "carrot", 0.45),   ("bill", "apple", 0.99),   ("john", "banana", 1.29),   ("bill", "taco", 2.59) ).toDF("name", "food", "price")  df.groupBy($"name")   .agg(collect_list(struct($"food", $"price")).as("foods"))   .show(false) 

Outputs:

+----+---------------------------------------------+ |name|foods                                        | +----+---------------------------------------------+ |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]| |bill|[[apple,0.99], [taco,2.59]]                  | +----+---------------------------------------------+ 
like image 125
Daniel Siegmann Avatar answered Oct 04 '22 18:10

Daniel Siegmann


The easiest way to do this as a DataFrame is to first collect two lists, and then use a UDF to zip the two lists together. Something like:

import org.apache.spark.sql.functions.{collect_list, udf} import sqlContext.implicits._  val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_))  val df = Seq(   ("john", "tomato", 1.99),   ("john", "carrot", 0.45),   ("bill", "apple", 0.99),   ("john", "banana", 1.29),   ("bill", "taco", 2.59) ).toDF("name", "food", "price")  val df2 = df.groupBy("name").agg(   collect_list(col("food")) as "food",   collect_list(col("price")) as "price"  ).withColumn("food", zipper(col("food"), col("price"))).drop("price")  df2.show(false) # +----+---------------------------------------------+ # |name|food                                         | # +----+---------------------------------------------+ # |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]| # |bill|[[apple,0.99], [taco,2.59]]                  | # +----+---------------------------------------------+ 
like image 35
David Griffin Avatar answered Oct 04 '22 17:10

David Griffin