Logo Questions Linux Laravel Mysql Ubuntu Git Menu

spark dataframe keep most recent record

I have a dataframe similar to:

id  | date       | value
--- | ---------- | ------
1   | 2016-01-07 | 13.90
1   | 2016-01-16 | 14.50
2   | 2016-01-09 | 10.50
2   | 2016-01-28 | 5.50
3   | 2016-01-05 | 1.50

I am trying to keep the most recent values for each id, like this:

id  | date       | value
--- | ---------- | ------
1   | 2016-01-16 | 14.50
2   | 2016-01-28 | 5.50
3   | 2016-01-05 | 1.50

I have tried sort by date desc and after drop duplicates:

new_df = df.orderBy(df.date.desc()).dropDuplicates(['id'])   

My questions are, dropDuplicates() will keep the first duplicate value that it finds? and is there a better way to accomplish what I want to do? By the way, I'm using python.

Thank you.

like image 564
julio alberto leal rivera Avatar asked Oct 13 '16 22:10

julio alberto leal rivera

2 Answers

The window operator as suggested works very well to solve this problem:

from datetime import date

rdd = sc.parallelize([
    [1, date(2016, 1, 7), 13.90],
    [1, date(2016, 1, 16), 14.50],
    [2, date(2016, 1, 9), 10.50],
    [2, date(2016, 1, 28), 5.50],
    [3, date(2016, 1, 5), 1.50]

df = rdd.toDF(['id','date','price'])

| id|      date|price|
|  1|2016-01-07| 13.9|
|  1|2016-01-16| 14.5|
|  2|2016-01-09| 10.5|
|  2|2016-01-28|  5.5|
|  3|2016-01-05|  1.5|

df.registerTempTable("entries") // Replaced by createOrReplaceTempView in Spark 2.0

output = sqlContext.sql('''
    FROM (
            dense_rank() OVER (PARTITION BY id ORDER BY date DESC) AS rank
        FROM entries
    ) vo WHERE rank = 1


| id|      date|price|rank|
|  1|2016-01-16| 14.5|   1|
|  2|2016-01-28|  5.5|   1|
|  3|2016-01-05|  1.5|   1|
like image 194
Fokko Driesprong Avatar answered Oct 06 '22 00:10

Fokko Driesprong

If you have items with the same date then you will get duplicates with the dense_rank. You should use row_number:

from pyspark.sql.window import Window
from datetime import date
​import pyspark.sql.functions as F 

rdd = spark.sparkContext.parallelize([
    [1, date(2016, 1, 7), 13.90],
    [1, date(2016, 1, 7), 10.0 ], # I added this row to show the effect of duplicate
    [1, date(2016, 1, 16), 14.50],
    [2, date(2016, 1, 9), 10.50],
    [2, date(2016, 1, 28), 5.50],
    [3, date(2016, 1, 5), 1.50]]
df = rdd.toDF(['id','date','price'])

| id|      date|price|
|  1|2016-01-07| 13.9|
|  1|2016-01-07| 10.0|
|  1|2016-01-16| 14.5|
|  2|2016-01-09| 10.5|
|  2|2016-01-28|  5.5|
|  3|2016-01-05|  1.5|

# row_number
| id|      date|price|row_number|
|  3|2016-01-05|  1.5|         1|
|  1|2016-01-07| 13.9|         1|
|  2|2016-01-09| 10.5|         1|

# dense_rank

| id|      date|price|dense_rank|
|  3|2016-01-05|  1.5|         1|
|  1|2016-01-07| 13.9|         1|
|  1|2016-01-07| 10.0|         1|
|  2|2016-01-09| 10.5|         1|

like image 22
gench Avatar answered Oct 06 '22 00:10
