I am having a Spark SQL DataFrame
with data and what I'm trying to get is all the rows preceding current row in a given date range. So for example I want to have all the rows from 7 days back preceding given row. I figured out I need to use a Window Function
like:
Window \ .partitionBy('id') \ .orderBy('start')
and here comes the problem. I want to have a rangeBetween
7 days, but there is nothing in the Spark docs I could find on this. Does Spark even provide such option? For now I'm just getting all the preceding rows with:
.rowsBetween(-sys.maxsize, 0)
but would like to achieve something like:
.rangeBetween("7 days", 0)
If anyone could help me on this one I'll be very grateful. Thanks in advance!
Spark SQL provides built-in standard Date and Timestamp (includes date and time) Functions defines in DataFrame API, these come in handy when we need to make operations on date and time. All these accept input as, Date type, Timestamp type or String.
Window functions operate on a group of rows, referred to as a window, and calculate a return value for each row based on the group of rows.
The Spark SQL functions package is imported into the environment to run date functions. Seq() function takes the date 26/05/2021 as Input in dd/mm/yyyy format. to_date() date function changes its format to yyyy/mm/dd. datediff():- This Date function returns the difference between the two dates defined in the function.
Spark SQL supports three kinds of window functions: ranking functions. analytic functions. aggregate functions.
Spark >= 2.3
Since Spark 2.3 it is possible to use interval objects using SQL API, but the DataFrame
API support is still work in progress.
df.createOrReplaceTempView("df") spark.sql( """SELECT *, mean(some_value) OVER ( PARTITION BY id ORDER BY CAST(start AS timestamp) RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW ) AS mean FROM df""").show() ## +---+----------+----------+------------------+ ## | id| start|some_value| mean| ## +---+----------+----------+------------------+ ## | 1|2015-01-01| 20.0| 20.0| ## | 1|2015-01-06| 10.0| 15.0| ## | 1|2015-01-07| 25.0|18.333333333333332| ## | 1|2015-01-12| 30.0|21.666666666666668| ## | 2|2015-01-01| 5.0| 5.0| ## | 2|2015-01-03| 30.0| 17.5| ## | 2|2015-02-01| 20.0| 20.0| ## +---+----------+----------+------------------+
Spark < 2.3
As far as I know it is not possible directly neither in Spark nor Hive. Both require ORDER BY
clause used with RANGE
to be numeric. The closest thing I found is conversion to timestamp and operating on seconds. Assuming start
column contains date
type:
from pyspark.sql import Row row = Row("id", "start", "some_value") df = sc.parallelize([ row(1, "2015-01-01", 20.0), row(1, "2015-01-06", 10.0), row(1, "2015-01-07", 25.0), row(1, "2015-01-12", 30.0), row(2, "2015-01-01", 5.0), row(2, "2015-01-03", 30.0), row(2, "2015-02-01", 20.0) ]).toDF().withColumn("start", col("start").cast("date"))
A small helper and window definition:
from pyspark.sql.window import Window from pyspark.sql.functions import mean, col # Hive timestamp is interpreted as UNIX timestamp in seconds* days = lambda i: i * 86400
Finally query:
w = (Window() .partitionBy(col("id")) .orderBy(col("start").cast("timestamp").cast("long")) .rangeBetween(-days(7), 0)) df.select(col("*"), mean("some_value").over(w).alias("mean")).show() ## +---+----------+----------+------------------+ ## | id| start|some_value| mean| ## +---+----------+----------+------------------+ ## | 1|2015-01-01| 20.0| 20.0| ## | 1|2015-01-06| 10.0| 15.0| ## | 1|2015-01-07| 25.0|18.333333333333332| ## | 1|2015-01-12| 30.0|21.666666666666668| ## | 2|2015-01-01| 5.0| 5.0| ## | 2|2015-01-03| 30.0| 17.5| ## | 2|2015-02-01| 20.0| 20.0| ## +---+----------+----------+------------------+
Far from pretty but works.
* Hive Language Manual, Types
Fantastic solution @zero323, if you want to operate with minutes instead of days as I have to, and you don't need to partition with id, so you only have to modify a simply part of the code as I show:
df.createOrReplaceTempView("df") spark.sql( """SELECT *, sum(total) OVER ( ORDER BY CAST(reading_date AS timestamp) RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW ) AS sum_total FROM df""").show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With