Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Window Functions - rangeBetween dates

I am having a Spark SQL DataFrame with data and what I'm trying to get is all the rows preceding current row in a given date range. So for example I want to have all the rows from 7 days back preceding given row. I figured out I need to use a Window Function like:

Window \     .partitionBy('id') \     .orderBy('start') 

and here comes the problem. I want to have a rangeBetween 7 days, but there is nothing in the Spark docs I could find on this. Does Spark even provide such option? For now I'm just getting all the preceding rows with:

.rowsBetween(-sys.maxsize, 0) 

but would like to achieve something like:

.rangeBetween("7 days", 0) 

If anyone could help me on this one I'll be very grateful. Thanks in advance!

like image 887
Nhor Avatar asked Oct 19 '15 05:10

Nhor


People also ask

Is date function in spark?

Spark SQL provides built-in standard Date and Timestamp (includes date and time) Functions defines in DataFrame API, these come in handy when we need to make operations on date and time. All these accept input as, Date type, Timestamp type or String.

How does window function work in spark?

Window functions operate on a group of rows, referred to as a window, and calculate a return value for each row based on the group of rows.

How does spark define date?

The Spark SQL functions package is imported into the environment to run date functions. Seq() function takes the date 26/05/2021 as Input in dd/mm/yyyy format. to_date() date function changes its format to yyyy/mm/dd. datediff():- This Date function returns the difference between the two dates defined in the function.

Does spark SQL support window functions?

Spark SQL supports three kinds of window functions: ranking functions. analytic functions. aggregate functions.


2 Answers

Spark >= 2.3

Since Spark 2.3 it is possible to use interval objects using SQL API, but the DataFrame API support is still work in progress.

df.createOrReplaceTempView("df")  spark.sql(     """SELECT *, mean(some_value) OVER (         PARTITION BY id          ORDER BY CAST(start AS timestamp)          RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW      ) AS mean FROM df""").show()  ## +---+----------+----------+------------------+        ## | id|     start|some_value|              mean| ## +---+----------+----------+------------------+ ## |  1|2015-01-01|      20.0|              20.0| ## |  1|2015-01-06|      10.0|              15.0| ## |  1|2015-01-07|      25.0|18.333333333333332| ## |  1|2015-01-12|      30.0|21.666666666666668| ## |  2|2015-01-01|       5.0|               5.0| ## |  2|2015-01-03|      30.0|              17.5| ## |  2|2015-02-01|      20.0|              20.0| ## +---+----------+----------+------------------+ 

Spark < 2.3

As far as I know it is not possible directly neither in Spark nor Hive. Both require ORDER BY clause used with RANGE to be numeric. The closest thing I found is conversion to timestamp and operating on seconds. Assuming start column contains date type:

from pyspark.sql import Row  row = Row("id", "start", "some_value") df = sc.parallelize([     row(1, "2015-01-01", 20.0),     row(1, "2015-01-06", 10.0),     row(1, "2015-01-07", 25.0),     row(1, "2015-01-12", 30.0),     row(2, "2015-01-01", 5.0),     row(2, "2015-01-03", 30.0),     row(2, "2015-02-01", 20.0) ]).toDF().withColumn("start", col("start").cast("date")) 

A small helper and window definition:

from pyspark.sql.window import Window from pyspark.sql.functions import mean, col   # Hive timestamp is interpreted as UNIX timestamp in seconds* days = lambda i: i * 86400  

Finally query:

w = (Window()    .partitionBy(col("id"))    .orderBy(col("start").cast("timestamp").cast("long"))    .rangeBetween(-days(7), 0))  df.select(col("*"), mean("some_value").over(w).alias("mean")).show()  ## +---+----------+----------+------------------+ ## | id|     start|some_value|              mean| ## +---+----------+----------+------------------+ ## |  1|2015-01-01|      20.0|              20.0| ## |  1|2015-01-06|      10.0|              15.0| ## |  1|2015-01-07|      25.0|18.333333333333332| ## |  1|2015-01-12|      30.0|21.666666666666668| ## |  2|2015-01-01|       5.0|               5.0| ## |  2|2015-01-03|      30.0|              17.5| ## |  2|2015-02-01|      20.0|              20.0| ## +---+----------+----------+------------------+ 

Far from pretty but works.


* Hive Language Manual, Types

like image 62
zero323 Avatar answered Sep 17 '22 14:09

zero323


Fantastic solution @zero323, if you want to operate with minutes instead of days as I have to, and you don't need to partition with id, so you only have to modify a simply part of the code as I show:

df.createOrReplaceTempView("df") spark.sql(     """SELECT *, sum(total) OVER (         ORDER BY CAST(reading_date AS timestamp)          RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW      ) AS sum_total FROM df""").show() 
like image 45
pabloverd Avatar answered Sep 19 '22 14:09

pabloverd