Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to unpack multiple keys in a Spark DataSet

I have the following DataSet, with the following structure.

case class Person(age: Int, gender: String, salary: Double)

I want to determine the average salary by gender and age, thus I group the DS by both keys. I've encountered two main problems, one is that both keys are mixed in a single one, but I want to keep them in two different columns, the other is that the aggregated column gets a silly long name and I can't figure out how to rename it (apparently as and alias won't work) all of this using the DS API.

val df = sc.parallelize(List(Person(100000.00, "male", 27), 
  Person(120000.00, "male", 27), 
  Person(95000, "male", 26),
  Person(89000, "female", 31),
  Person(250000, "female", 51),
  Person(120000, "female", 51)
)).toDF.as[Person]

df.groupByKey(p => (p.gender, p.age)).agg(typed.avg(_.salary)).show()

+-----------+------------------------------------------------------------------------------------------------+
|        key| TypedAverage(line2503618a50834b67a4b132d1b8d2310b12.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Person)|          
+-----------+------------------------------------------------------------------------------------------------+ 
|[female,31]|  89000.0... 
|[female,51]| 185000.0...
|  [male,27]| 110000.0...
|  [male,26]|  95000.0...
+-----------+------------------------------------------------------------------------------------------------+
like image 723
Alberto Bonsanto Avatar asked Mar 19 '17 22:03

Alberto Bonsanto


1 Answers

Aliasing is an untyped action, so you must retype it after. And the only way to unpack the key is to do it after, via a select or something:

df.groupByKey(p => (p.gender, p.age))
  .agg(typed.avg[Person](_.salary).as("average_salary").as[Double])
  .select($"key._1",$"key._2",$"average_salary").show
like image 87
Justin Pihony Avatar answered Sep 30 '22 20:09

Justin Pihony