Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark windowing over datetimes and including windows containing no rows in the results

I'm trying to aggregate my data by getting the sum every 30 seconds. I would like to know if the result of this aggregation is zero, this will happen if there are no rows in that 30s region.

Here's a minimal working example illustrating the result I would like with pandas, and where it falls short with pyspark.

Input data

import pandas as pd
from pyspark.sql import functions as F

df = pd.DataFrame(
    [
        (17, "2017-03-10T15:27:18+00:00"),
        (13, "2017-03-10T15:27:29+00:00"),
        (25, "2017-03-10T15:27:30+00:00"),
        (101, "2017-03-10T15:29:00+00:00"),
        (99, "2017-03-10T15:29:29+00:00")
    ],
    columns=["dollars", "timestamp"],
)
df["timestamp"] = pd.to_datetime(df["timestamp"])
print(df)
    dollars timestamp
0   17  2017-03-10 15:27:18+00:00
1   13  2017-03-10 15:27:29+00:00
2   25  2017-03-10 15:27:30+00:00
3   101 2017-03-10 15:29:00+00:00
4   99  2017-03-10 15:29:29+00:00

Pandas solution

With pandas, we can use resample to aggregate every 30 second window, and then apply the sum function over these windows (note the results for 2017-03-10 15:28:00+00:00, and 2017-03-10 15:28:30+00:00):

desired_result = df.set_index("timestamp").resample("30S").sum()
desired_result
                            dollars
timestamp   
2017-03-10 15:27:00+00:00   30
2017-03-10 15:27:30+00:00   25
2017-03-10 15:28:00+00:00   0
2017-03-10 15:28:30+00:00   0
2017-03-10 15:29:00+00:00   200

PySpark near solution

In pyspark, we can use pyspark.sql.functions.window to window over every 30 seconds (adapted, with thanks from this stack answer), but this will miss out the window where there are no rows:

spark: pyspark.sql.session.SparkSession  # I expect you to have set up your session...
sdf = spark.createDataFrame(df)
sdf.groupby(
    F.window("timestamp", windowDuration="30 seconds", slideDuration="30 seconds")
).agg(F.sum("dollars")).display()
window,sum(dollars)
"{""start"":""2017-03-10T15:27:30.000+0000"",""end"":""2017-03-10T15:28:00.000+0000""}",25
"{""start"":""2017-03-10T15:27:00.000+0000"",""end"":""2017-03-10T15:27:30.000+0000""}",30
"{""start"":""2017-03-10T15:29:00.000+0000"",""end"":""2017-03-10T15:29:30.000+0000""}",200

Question

How do I get pyspark to return window results for time window where there are no rows (like pandas)?

like image 767
James Owers Avatar asked Oct 19 '25 19:10

James Owers


1 Answers

You can use timestamp arithmetic as mentioned in this answer (I recommend you take a look at it as he goes into details). In your case it would be:

from pyspark.sql import functions as F

seconds = 30
epoch = (F.col("timestamp").cast("timestamp").cast("bigint") / seconds).cast(
    "bigint"
) * seconds
df = spark.createDataFrame(
    [
        (17, "2017-03-10T15:27:18+00:00"),
        (13, "2017-03-10T15:27:29+00:00"),
        (25, "2017-03-10T15:27:30+00:00"),
        (101, "2017-03-10T15:29:00+00:00"),
        (99, "2017-03-10T15:29:29+00:00"),
    ],
    ["dollars", "timestamp"],
).withColumn("epoch", epoch)

min_epoch, max_epoch = df.select(F.min("epoch"), F.max("epoch")).first()

ref = spark.range(min_epoch, max_epoch + seconds, seconds).toDF("epoch")

(
    ref.join(df, "epoch", "left")
    .withColumn("ts_resampled", F.timestamp_seconds("epoch"))
    .groupBy("ts_resampled")
    .sum("dollars")
    .orderBy("ts_resampled")
    .fillna(0, subset=["sum(dollars)"])
    .show(truncate=False)
)

Output

|ts_resampled       |sum(dollars)|
+-------------------+------------+
|2017-03-10 12:27:00|30          |
|2017-03-10 12:27:30|25          |
|2017-03-10 12:28:00|0           |
|2017-03-10 12:28:30|0           |
|2017-03-10 12:29:00|200         |
+-------------------+------------+
like image 171
ottobricks Avatar answered Oct 22 '25 09:10

ottobricks



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!