Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement LEAD and LAG in Spark-scala

I have final records (after joins and filter) in spark dataframe.I need to compare consecutive rows's (partition by key) column values and based on condition need to change e_date column value for example:

    sample table
    key1 key 2   col1   col2   s_date      e_date
     a     1      cv1     cv2   2014         2099 
     a     1      cv3     cv2   2016         2099 
     b     2      cv5     cv6   2016         2099
     b     2      cv5     cv6   2016         2099

   final table should look like 
    key1 key 2   col1   col2   s_date      e_date
     a     1      cv1     cv2   2014         2015  (next records s_date-1) 
     a     1      cv3     cv2   2016         2099 
     b     2      cv5     cv6   2016         2099
  1. above table has composite key so key1 and key2 are keys

  2. compare col1 and col2 values over partition by keys

  3. if any column has new value end old record with new record's s_date -1 (line 1 ,2 in final table)

  4. if no change then ignore new record (line 3 in final table)

any pointer in scala-spark

like image 872
user2895589 Avatar asked Jun 10 '16 18:06

user2895589


1 Answers

Lead and lag are already implemented:

import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window

lag('s_date, 1).over(Window.partitionBy('key1, 'key2).orderBy('s_date))

Check Introducing Window Functions in Spark SQL for details.

like image 125
2 revsuser6022341 Avatar answered Oct 17 '22 12:10

2 revsuser6022341