What is the proper way of specifying window interval in Spark SQL, using two predefined boundaries?
I am trying to sum up values from my table over a window of "between 3 hours ago and 2 hours ago".
When I run this query:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 2 hours preceding and current row
) as sum_value
from my_temp_table;
That works. I get results that I expect, i.e. sums of values that fall into 2 hours rolling window.
Now, what I need is to have that rolling window not being bound to the current row but to take into account rows between 3 hours ago and 2 hours ago. I tried with:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 hours preceding
) as sum_value
from my_temp_table;
But I get extraneous input 'hours' expecting {'PRECEDING', 'FOLLOWING'}
error.
I also tried with:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and interval 2 hours preceding
) as sum_value
from my_temp_table;
but then I get different error scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$)
Third option I tried is:
select *, sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and 2 preceding
) as sum_value
from my_temp_table;
and it doesn't work as we would expect: cannot resolve 'RANGE BETWEEN interval 3 hours PRECEDING AND 2 PRECEDING' due to data type mismatch
I am having difficulties finding the docs for interval type as this link doesn't say enough and other information is kinda half baked. At least what I found.
Since range intervals didn't work their thing, I had to turn to an alternative approach. It goes something like this:
In my case, I had to run computations for each hour of the day and combine those "hourly" results, i.e. a list of 24 data frames, into one, "daily", data frame.
Code, from very high level perspective, looks like this:
val hourlyDFs = for ((hourStart, hourEnd) <- (hoursToStart, hoursToEnd).zipped) yield {
val data = data.where($"hour" <= lit(hourEnd) && $"hour" >= lit(hourStart))
// do stuff
// return a data frame
}
hourlyDFs.toSeq().reduce(_.union(_))
A workaround for getting the same result would be to calculate the sum of the value within the last 3 hours and then subtract the sum of the value within the last 2 hours:
select *,
sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 3 hours preceding and current row)
-
sum(value) over (
partition by a, b
order by cast(time_value as timestamp)
range between interval 2 hours preceding and current row)
as sum_value
from my_temp_table;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With