I have two dataframes, each one with a date column. ie:
+-----------+
| DEADLINES|
+-----------+
| 2023-07-15|
| 2018-08-10|
| 2022-03-28|
| 2021-06-22|
| 2021-12-18|
| 2021-10-11|
| 2021-11-13|
+-----------+
+----------+
| DT_DATE|
+----------+
|2021-04-02|
|2021-04-21|
|2021-05-01|
|2021-06-03|
|2021-09-07|
|2021-10-12|
|2021-11-02|
+----------+
I need to count how many dates of DT_DATE are between a given reference date and each one of DEADLINES dates.
For example: using 2021-03-31 as reference date should give the following result set.
+-----------+------------+
| DEADLINES| dt_count|
+-----------+------------+
| 2023-07-15| 7|
| 2018-08-10| 0|
| 2022-03-28| 7|
| 2021-06-22| 4|
| 2021-12-18| 7|
| 2021-10-11| 5|
| 2021-11-13| 7|
+-----------+------------+
I managed to make it work iterating through each row of deadlines dataframe but with a larger dataset the performance got very poor.
Does anyone have a better solution?
Edit: thats my current solution:
def count_days(deadlines_df, dates_df, ref_date):
for row in deadlines_df.collect():
qtt = dates_df.filter(dates_df.DT_DATE.between(ref_date, row.DEADLINES)).count()
yield row.DEADLINES, qtt
new_df = spark.createDataFrame(count_days(deadlines_df, dates_df, "2021-03-31"), ["DEADLINES", "dt_count"])
Both dataframes can be united with different weight, and Window function with range from start to current row used (Scala):
val deadlines = Seq(
("2023-07-15"),
("2018-08-10"),
("2022-03-28"),
("2021-06-22"),
("2021-12-18"),
("2021-10-11"),
("2021-11-13")
).toDF("DEADLINES")
val dates = Seq(
("2021-04-02"),
("2021-04-21"),
("2021-05-01"),
("2021-06-03"),
("2021-09-07"),
("2021-10-12"),
("2021-11-02")
).toDF("DT_DATE")
val referenceDate = "2021-03-31"
val united = deadlines.withColumn("weight", lit(0))
.unionAll(
dates
.where($"DT_DATE" >= referenceDate)
.withColumn("weight", lit(1))
)
val fromStartToCurrentRowWindow = Window.orderBy("DEADLINES").rangeBetween(Window.unboundedPreceding, Window.currentRow)
val result = united
.withColumn("dt_count", sum("weight").over(fromStartToCurrentRowWindow))
.where($"weight" === lit(0))
.drop("weight")
Output:
+----------+--------+
|DEADLINES |dt_count|
+----------+--------+
|2018-08-10|0 |
|2021-06-22|4 |
|2021-10-11|5 |
|2021-11-13|7 |
|2021-12-18|7 |
|2022-03-28|7 |
|2023-07-15|7 |
+----------+--------+
Note: calculation will be executed in one partition, Spark shows such warning: WARN Logging - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Also other solution possible, joining two dataframes by range, which leads to cartesian join.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With