I have a dataframe with schema as such:
[visitorId: string, trackingIds: array<string>, emailIds: array<string>]
Looking for a way to group (or maybe rollup?) this dataframe by visitorid where the trackingIds and emailIds columns would append together. So for example if my initial df looks like:
visitorId |trackingIds|emailIds +-----------+------------+-------- |a158| [666b] | [12] |7g21| [c0b5] | [45] |7g21| [c0b4] | [87] |a158| [666b, 777c]| []
I would like my output df to look like this
visitorId |trackingIds|emailIds +-----------+------------+-------- |a158| [666b,666b,777c]| [12,''] |7g21| [c0b5,c0b4] | [45, 87]
Attempting to use groupBy
and agg
operators but not have much luck.
In summary, Spark SQL function collect_list() and collect_set() aggregates the data into a list and returns an ArrayType. collect_set() de-dupes the data and return unique values whereas collect_list() returns the values as is without eliminating the duplicates.
Does it mean collect_list also maintains the order? In your code, you sort the entire dataset before collect_list() so yes.
Flatten – Creates a single array from an array of arrays (nested array). If a structure of nested arrays is deeper than two levels then only one level of nesting is removed.
The groupBy method is defined in the Dataset class. groupBy returns a RelationalGroupedDataset object where the agg() method is defined. Spark makes great use of object oriented programming! The RelationalGroupedDataset class also defines a sum() method that can be used to get the same result with less code.
Spark >= 2.4
You can replace flatten
udf
with built-in flatten
function
import org.apache.spark.sql.functions.flatten
leaving the rest as-is.
Spark >= 2.0, < 2.4
It is possible but quite expensive. Using data you've provided:
case class Record( visitorId: String, trackingIds: Array[String], emailIds: Array[String]) val df = Seq( Record("a158", Array("666b"), Array("12")), Record("7g21", Array("c0b5"), Array("45")), Record("7g21", Array("c0b4"), Array("87")), Record("a158", Array("666b", "777c"), Array.empty[String])).toDF
and a helper function:
import org.apache.spark.sql.functions.udf val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)
we can fill the blanks with placeholders:
import org.apache.spark.sql.functions.{array, lit, when} val dfWithPlaceholders = df.withColumn( "emailIds", when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))
collect_lists
and flatten
:
import org.apache.spark.sql.functions.{array, collect_list} val emailIds = flatten(collect_list($"emailIds")).alias("emailIds") val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds") df .groupBy($"visitorId") .agg(trackingIds, emailIds) // +---------+------------------+--------+ // |visitorId| trackingIds|emailIds| // +---------+------------------+--------+ // | a158|[666b, 666b, 777c]| [12, ]| // | 7g21| [c0b5, c0b4]|[45, 87]| // +---------+------------------+--------+
With statically typed Dataset
:
df.as[Record] .groupByKey(_.visitorId) .mapGroups { case (key, vs) => vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match { case (trackingIds, emailIds) => Record(key, trackingIds.flatten, emailIds.flatten) }} // +---------+------------------+--------+ // |visitorId| trackingIds|emailIds| // +---------+------------------+--------+ // | a158|[666b, 666b, 777c]| [12, ]| // | 7g21| [c0b5, c0b4]|[45, 87]| // +---------+------------------+--------+
Spark 1.x
You can convert to RDD and group
import org.apache.spark.sql.Row dfWithPlaceholders.rdd .map { case Row(id: String, trcks: Seq[String @ unchecked], emails: Seq[String @ unchecked]) => (id, (trcks, emails)) } .groupByKey .map {case (key, vs) => vs.toArray.unzip match { case (trackingIds, emailIds) => Record(key, trackingIds.flatten, emailIds.flatten) }} .toDF // +---------+------------------+--------+ // |visitorId| trackingIds|emailIds| // +---------+------------------+--------+ // | 7g21| [c0b5, c0b4]|[45, 87]| // | a158|[666b, 666b, 777c]| [12, ]| // +---------+------------------+--------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With