I have a DateFrame as follow:
+---+---------------------+---------------------+
|id |initDate |endDate |
+---+---------------------+---------------------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|
|138|2016-10-04 00:00:00.0|null |
+---+---------------------+---------------------+
The rows are ordered by id
then initDate
column in ascending order.
Both initDate
and endDate
columns have Timestamp type. For illustrative purpose, I just showed the records belonging to one id
value.
My goal is to add a new column, showing for each id
the difference (in term of days) between the initDate
of each row and the endDate
of the previous row.
If there is no previous row, then the value will be -1.
The output should look like this:
+---+---------------------+---------------------+----------+
|id |initDate |endDate |difference|
+---+---------------------+---------------------+----------+
|138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|-1 |
|138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|11 |
|138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|12 |
|138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|0 |
|138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|7 |
|138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|4 |
|138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|8 |
|138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|12 |
|138|2016-10-04 00:00:00.0|null |7 |
+---+---------------------+---------------------+----------+
I am thinking to use a window function to partition the records by id
, but I am not figuring how to do the next steps.
Thanks to the hint of @lostInOverflow, I came up with the following solution:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("id").orderBy("initDate")
val previousEnd = lag($"endDate", 1).over(w)
filteredDF.withColumn("prev", previousEnd)
.withColumn("difference", datediff($"initDate", $"prev"))
Try:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("id").orderBy("endDate")
df.withColumn("difference", date_sub($"initDate", lag($"endDate", 1).over(w)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With