I was wondering if there is some way to specify a custom aggregation function for spark dataframes over multiple columns.
I have a table like this of the type (name, item, price):
john | tomato | 1.99 john | carrot | 0.45 bill | apple | 0.99 john | banana | 1.29 bill | taco | 2.59
to:
I would like to aggregate the item and it's cost for each person into a list like this:
john | (tomato, 1.99), (carrot, 0.45), (banana, 1.29) bill | (apple, 0.99), (taco, 2.59)
Is this possible in dataframes? I recently learned about collect_list
but it appears to only work for one column.
Data frame in use: In PySpark, groupBy() is used to collect the identical data into groups on the PySpark DataFrame and perform aggregate functions on the grouped data. So by this we can do multiple aggregations at a time. where, column_name_group is the column to be grouped.
Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.
Consider using the struct
function to group the columns together before collecting as a list:
import org.apache.spark.sql.functions.{collect_list, struct} import sqlContext.implicits._ val df = Seq( ("john", "tomato", 1.99), ("john", "carrot", 0.45), ("bill", "apple", 0.99), ("john", "banana", 1.29), ("bill", "taco", 2.59) ).toDF("name", "food", "price") df.groupBy($"name") .agg(collect_list(struct($"food", $"price")).as("foods")) .show(false)
Outputs:
+----+---------------------------------------------+ |name|foods | +----+---------------------------------------------+ |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]| |bill|[[apple,0.99], [taco,2.59]] | +----+---------------------------------------------+
The easiest way to do this as a DataFrame
is to first collect two lists, and then use a UDF
to zip
the two lists together. Something like:
import org.apache.spark.sql.functions.{collect_list, udf} import sqlContext.implicits._ val zipper = udf[Seq[(String, Double)], Seq[String], Seq[Double]](_.zip(_)) val df = Seq( ("john", "tomato", 1.99), ("john", "carrot", 0.45), ("bill", "apple", 0.99), ("john", "banana", 1.29), ("bill", "taco", 2.59) ).toDF("name", "food", "price") val df2 = df.groupBy("name").agg( collect_list(col("food")) as "food", collect_list(col("price")) as "price" ).withColumn("food", zipper(col("food"), col("price"))).drop("price") df2.show(false) # +----+---------------------------------------------+ # |name|food | # +----+---------------------------------------------+ # |john|[[tomato,1.99], [carrot,0.45], [banana,1.29]]| # |bill|[[apple,0.99], [taco,2.59]] | # +----+---------------------------------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With