Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark: rolling average using timeseries data

I have a dataset consisting of a timestamp column and a dollars column. I would like to find the average number of dollars per week ending at the timestamp of each row. I was initially looking at the pyspark.sql.functions.window function, but that bins the data by week.

Here's an example:

%pyspark import datetime from pyspark.sql import functions as F  df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"]) df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))  w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg')) w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect() 

This results in two records:

|        start        |          end         | avg | |---------------------|----------------------|-----| |'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0| |---------------------|----------------------|-----| |'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0| |---------------------|----------------------|-----| 

The window function binned the time series data rather than performing a rolling average.

Is there a way to perform a rolling average where I'll get back a weekly average for each row with a time period ending at the timestampGMT of the row?

EDIT:

Zhang's answer below is close to what I want, but not exactly what I'd like to see.

Here's a better example to show what I'm trying to get at:

%pyspark from pyspark.sql import functions as F df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),                         (13, "2017-03-15T12:27:18+00:00"),                         (25, "2017-03-18T11:27:18+00:00")],                         ["dollars", "timestampGMT"]) df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp')) df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days")))) 

This results in the following dataframe:

dollars timestampGMT            rolling_average 25      2017-03-18 11:27:18.0   25 17      2017-03-10 15:27:18.0   15 13      2017-03-15 12:27:18.0   15 

I'd like the average to be over the week proceeding the date in the timestampGMT column, which would result in this:

dollars timestampGMT            rolling_average 17      2017-03-10 15:27:18.0   17 13      2017-03-15 12:27:18.0   15 25      2017-03-18 11:27:18.0   19 

In the above results, the rolling_average for 2017-03-10 is 17, since there are no preceding records. The rolling_average for 2017-03-15 is 15 because it is averaging the 13 from 2017-03-15 and the 17 from 2017-03-10 which falls withing the preceding 7 day window. The rolling average for 2017-03-18 is 19 because it is averaging the 25 from 2017-03-18 and the 13 from 2017-03-10 which falls withing the preceding 7 day window, and it is not including the 17 from 2017-03-10 because that does not fall withing the preceding 7 day window.

Is there a way to do this rather than the binning window where the weekly windows don't overlap?

like image 727
Bob Swain Avatar asked Aug 21 '17 22:08

Bob Swain


People also ask

How do you find the average PySpark?

By using the avg() method, we can get the average value from the column, and finally, we can use the collect() method to get the average from the column. Where, df is the input PySpark DataFrame. column_name is the column to get the average value.

How do you use pandas UDF in PySpark?

Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required.

How is rolling average calculated in SQL?

The second part of the query (in black text) is the calculation of the rolling average. Similarly to the first example, we use the AVG() window function and the clause OVER(ORDER BY day ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) . This applies the AVG() function to the current row and the nine rows before it.


1 Answers

I figured out the correct way to calculate a moving/rolling average using this stackoverflow:

Spark Window Functions - rangeBetween dates

The basic idea is to convert your timestamp column to seconds, and then you can use the rangeBetween function in the pyspark.sql.Window class to include the correct rows in your window.

Here's the solved example:

%pyspark from pyspark.sql import functions as F from pyspark.sql.window import Window   #function to calculate number of seconds from number of days days = lambda i: i * 86400  df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),                         (13, "2017-03-15T12:27:18+00:00"),                         (25, "2017-03-18T11:27:18+00:00")],                         ["dollars", "timestampGMT"]) df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))  #create window by casting timestamp to long (number of seconds) w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))  df = df.withColumn('rolling_average', F.avg("dollars").over(w)) 

This results in the exact column of rolling averages that I was looking for:

dollars   timestampGMT            rolling_average 17        2017-03-10 15:27:18.0   17.0 13        2017-03-15 12:27:18.0   15.0 25        2017-03-18 11:27:18.0   19.0 
like image 118
Bob Swain Avatar answered Oct 01 '22 12:10

Bob Swain