Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to aggregate values into collection after groupBy?

I have a dataframe with schema as such:

[visitorId: string, trackingIds: array<string>, emailIds: array<string>] 

Looking for a way to group (or maybe rollup?) this dataframe by visitorid where the trackingIds and emailIds columns would append together. So for example if my initial df looks like:

visitorId   |trackingIds|emailIds +-----------+------------+-------- |a158|      [666b]      |    [12] |7g21|      [c0b5]      |    [45] |7g21|      [c0b4]      |    [87] |a158|      [666b, 777c]|    [] 

I would like my output df to look like this

visitorId   |trackingIds|emailIds +-----------+------------+-------- |a158|      [666b,666b,777c]|      [12,''] |7g21|      [c0b5,c0b4]     |      [45, 87] 

Attempting to use groupBy and agg operators but not have much luck.

like image 975
Eric Patterson Avatar asked Dec 10 '15 13:12

Eric Patterson


People also ask

What does collect_ set do?

In summary, Spark SQL function collect_list() and collect_set() aggregates the data into a list and returns an ArrayType. collect_set() de-dupes the data and return unique values whereas collect_list() returns the values as is without eliminating the duplicates.

Does collect_list maintain order?

Does it mean collect_list also maintains the order? In your code, you sort the entire dataset before collect_list() so yes.

What is flattening in Spark?

Flatten – Creates a single array from an array of arrays (nested array). If a structure of nested arrays is deeper than two levels then only one level of nesting is removed.

How does groupBy work in Spark?

The groupBy method is defined in the Dataset class. groupBy returns a RelationalGroupedDataset object where the agg() method is defined. Spark makes great use of object oriented programming! The RelationalGroupedDataset class also defines a sum() method that can be used to get the same result with less code.


1 Answers

Spark >= 2.4

You can replace flatten udf with built-in flatten function

import org.apache.spark.sql.functions.flatten 

leaving the rest as-is.

Spark >= 2.0, < 2.4

It is possible but quite expensive. Using data you've provided:

case class Record(     visitorId: String, trackingIds: Array[String], emailIds: Array[String])  val df = Seq(   Record("a158", Array("666b"), Array("12")),   Record("7g21", Array("c0b5"), Array("45")),   Record("7g21", Array("c0b4"), Array("87")),   Record("a158", Array("666b",  "777c"), Array.empty[String])).toDF 

and a helper function:

import org.apache.spark.sql.functions.udf  val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten) 

we can fill the blanks with placeholders:

import org.apache.spark.sql.functions.{array, lit, when}  val dfWithPlaceholders = df.withColumn(   "emailIds",    when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds")) 

collect_lists and flatten:

import org.apache.spark.sql.functions.{array, collect_list}  val emailIds = flatten(collect_list($"emailIds")).alias("emailIds") val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds")  df   .groupBy($"visitorId")   .agg(trackingIds, emailIds)  // +---------+------------------+--------+ // |visitorId|       trackingIds|emailIds| // +---------+------------------+--------+ // |     a158|[666b, 666b, 777c]|  [12, ]| // |     7g21|      [c0b5, c0b4]|[45, 87]| // +---------+------------------+--------+ 

With statically typed Dataset:

df.as[Record]   .groupByKey(_.visitorId)   .mapGroups { case (key, vs) =>      vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match {       case (trackingIds, emailIds) =>          Record(key, trackingIds.flatten, emailIds.flatten)   }}  // +---------+------------------+--------+ // |visitorId|       trackingIds|emailIds| // +---------+------------------+--------+ // |     a158|[666b, 666b, 777c]|  [12, ]| // |     7g21|      [c0b5, c0b4]|[45, 87]| // +---------+------------------+--------+ 

Spark 1.x

You can convert to RDD and group

import org.apache.spark.sql.Row  dfWithPlaceholders.rdd   .map {      case Row(id: String,         trcks: Seq[String @ unchecked],        emails: Seq[String @ unchecked]) => (id, (trcks, emails))   }   .groupByKey   .map {case (key, vs) => vs.toArray.unzip match {     case (trackingIds, emailIds) =>        Record(key, trackingIds.flatten, emailIds.flatten)   }}   .toDF  // +---------+------------------+--------+ // |visitorId|       trackingIds|emailIds| // +---------+------------------+--------+ // |     7g21|      [c0b5, c0b4]|[45, 87]| // |     a158|[666b, 666b, 777c]|  [12, ]| // +---------+------------------+--------+ 
like image 188
zero323 Avatar answered Sep 26 '22 06:09

zero323