Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SPARK, DataFrame: difference of Timestamp columns over consecutive rows

I have a DateFrame as follow:

+---+---------------------+---------------------+
|id |initDate             |endDate              |
+---+---------------------+---------------------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|
|138|2016-10-04 00:00:00.0|null                 |
+---+---------------------+---------------------+

The rows are ordered by id then initDate column in ascending order. Both initDate and endDate columns have Timestamp type. For illustrative purpose, I just showed the records belonging to one id value.

My goal is to add a new column, showing for each id the difference (in term of days) between the initDate of each row and the endDate of the previous row.

If there is no previous row, then the value will be -1.

The output should look like this:

+---+---------------------+---------------------+----------+
|id |initDate             |endDate              |difference|
+---+---------------------+---------------------+----------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|-1        |
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|11        |
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|12        |
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|0         |
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|7         |
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|4         |
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|8         |
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|12        |
|138|2016-10-04 00:00:00.0|null                 |7         |
+---+---------------------+---------------------+----------+

I am thinking to use a window function to partition the records by id, but I am not figuring how to do the next steps.

like image 990
Rami Avatar asked Oct 31 '16 11:10

Rami


2 Answers

Thanks to the hint of @lostInOverflow, I came up with the following solution:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("id").orderBy("initDate")
val previousEnd = lag($"endDate", 1).over(w)
filteredDF.withColumn("prev", previousEnd)
          .withColumn("difference", datediff($"initDate", $"prev"))
like image 72
Rami Avatar answered Sep 28 '22 03:09

Rami


Try:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("id").orderBy("endDate")

df.withColumn("difference", date_sub($"initDate", lag($"endDate", 1).over(w)))
like image 33
2 revs, 2 users 93%user6022341 Avatar answered Sep 28 '22 02:09

2 revs, 2 users 93%user6022341