I have final records (after joins and filter) in spark dataframe.I need to compare consecutive rows's (partition by key) column values and based on condition need to change e_date column value for example:
sample table
key1 key 2 col1 col2 s_date e_date
a 1 cv1 cv2 2014 2099
a 1 cv3 cv2 2016 2099
b 2 cv5 cv6 2016 2099
b 2 cv5 cv6 2016 2099
final table should look like
key1 key 2 col1 col2 s_date e_date
a 1 cv1 cv2 2014 2015 (next records s_date-1)
a 1 cv3 cv2 2016 2099
b 2 cv5 cv6 2016 2099
above table has composite key so key1 and key2 are keys
compare col1 and col2 values over partition by keys
if any column has new value end old record with new record's s_date -1 (line 1 ,2 in final table)
any pointer in scala-spark
Lead and lag are already implemented:
import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window
lag('s_date, 1).over(Window.partitionBy('key1, 'key2).orderBy('s_date))
Check Introducing Window Functions in Spark SQL for details.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With