Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Dataframes- Reducing By Key

Let's say I have a data structure like this where ts is some timestamp

case class Record(ts: Long, id: Int, value: Int)

Given a large number of these records I want to end up with the record with the highest timestamp for each id. Using the RDD api I think the following code gets the job done:

def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
  records.keyBy(_.id).reduceByKey{
    (x, y) => if(x.ts > y.ts) x else y
  }.values
}

Likewise this is my attempt with datasets:

def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
  records.groupByKey(_.id).mapGroups{
    case(id, records) => {
      records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
    }
  }
}

I've being trying to work out how to achieve something similar with dataframes but to no avail- I realise I can do the grouping with:

records.groupBy($"id")

But that gives me a RelationGroupedDataSet and it's not clear to me what aggregation function I need to write to achieve what I want- all example aggregations I've seen appear to focus on returning just a single column being aggregated rather than the whole row.

Is it possible to achieve this using dataframes?

like image 557
d80tb7 Avatar asked Dec 20 '16 07:12

d80tb7


1 Answers

You can use the argmax logic (see databricks example)

For example, lets say your dataframe is called df and it has the columns id, val, ts you would do something like this:

import org.apache.spark.sql.functions._
val newDF = df.groupBy('id).agg.max(struct('ts, 'val)) as 'tmp).select($"id", $"tmp.*")
like image 157
Assaf Mendelson Avatar answered Nov 10 '22 04:11

Assaf Mendelson