Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What does the 'pyspark.sql.functions.window' function's 'startTime' argument do?

In the offcial doc there is just a simple example:

The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide startTime as 15 minutes.

But I want to know how does it works with all arguments.

For example:

ts_list = map(lambda x: datetime.datetime(2017, 1, 9, 9, 0, 10) + datetime.timedelta(seconds=x), range(30))
rdd = spark.sparkContext.parallelize(ts_list).map(lambda x: (x, 1))
df = spark.createDataFrame(rdd, schema=['dt', 'val'])
win = df.groupBy(window("dt", "5 seconds", '4 seconds', '3 seconds')).agg(sum("val").alias("sum"))
pprint.pprint(win.select(win['window']['start'].cast('string').alias('start'),
                         win['window']['end'].cast('string').alias('end')).collect())

output:

[Row(start=u'2017-01-09 09:00:19', end=u'2017-01-09 09:00:24'),                 
 Row(start=u'2017-01-09 09:00:35', end=u'2017-01-09 09:00:40'),
 Row(start=u'2017-01-09 09:00:27', end=u'2017-01-09 09:00:32'),
 Row(start=u'2017-01-09 09:00:07', end=u'2017-01-09 09:00:12'),
 Row(start=u'2017-01-09 09:00:31', end=u'2017-01-09 09:00:36'),
 Row(start=u'2017-01-09 09:00:39', end=u'2017-01-09 09:00:44'),
 Row(start=u'2017-01-09 09:00:11', end=u'2017-01-09 09:00:16'),
 Row(start=u'2017-01-09 09:00:23', end=u'2017-01-09 09:00:28'),
 Row(start=u'2017-01-09 09:00:15', end=u'2017-01-09 09:00:20')]

So why?

like image 907
Zhang Tong Avatar asked Jan 09 '17 05:01

Zhang Tong


2 Answers

It has nothing to do with when your data start. Of course the first window will appear only until you have some data in that window. But the startTime has nothing to do with your data. As documentaiton says, the startTime is the offset with respect to 1970-01-01 19:00:00 UTC with which to start window intervals. if you create a window like this:
w = F.window("date_field", "7 days", startTime='6 days')

spark will generate the windows of 7 days starting from 1970-01-06:

1970-01-06 19:00:00, 1970-01-13 19:00:00
1970-01-13 19:00:00, 1970-01-20 19:00:00
1970-01-20 19:00:00, 1970-01-27 19:00:00
...
2017-05-16 19:00:00, 2017-05-23 19:00:00
(if you continue calculating you get to this date) ...
But you only will see the windows that are related to the dates of your dataframe. The 19:00:00 is because my timezone which is -05
if you create a window like this:

w = F.window("date_field", "7 days", startTime='2 days')

spark will generate the windows of 7 days starting from 1970-01-02:

1970-01-02 19:00:00, 1970-01-09 19:00:00
1970-01-09 19:00:00, 1970-01-16 19:00:00
...
2017-05-19 19:00:00, 2017-05-26 19:00:00
(if you continue calculating you get to this date)
...

Again you only will see the windows that are related to the dates of your dataframe.

So, how can you calculate your startdate for the windows of your data?
you just need to calculate the number of days of your startdate since 1970-01-01, then divided it by the length of your window and take the remainder. That will be the offset days starttime.


I will explain it with an example: Asumming that you need your windows start at 2017-05-21 and the length of the windows is 7 days. I will create a dummy dataframe for the example.

row = Row("id", "date_field", "value")
df = sc.parallelize([
row(1, "2017-05-23", 5.0),
row(1, "2017-05-26", 10.0),
row(1, "2017-05-29", 4.0),
row(1, "2017-06-10", 3.0),]).toDF()

start_date = datetime(2017, 5, 21, 19, 0, 0) # 19:00:00 because my 
timezone 
days_since_1970_to_start_date =int(time.mktime(start_date.timetuple())/86400)
offset_days = days_since_1970_to_start_date % 7

w = F.window("date_field", "7 days", startTime='{} days'.format(
                                        offset_days))

df.groupby("id", w).agg(F.sum("value")).orderBy("window.start").show(10, False)

you will get:

+---+------------------------------------------+----------+
|id |window                                    |sum(value)|
+---+------------------------------------------+----------+
|1  |[2017-05-21 19:00:00, 2017-05-28 19:00:00]|15.0      |
|1  |[2017-05-28 19:00:00, 2017-06-04 19:00:00]|4.0       |
|1  |[2017-06-04 19:00:00, 2017-06-11 19:00:00]|3.0       |
+---+------------------------------------------+----------+
like image 171
Camilo Rodriguez Avatar answered Oct 18 '22 09:10

Camilo Rodriguez


Let's go steps by step.

  • Your data starts at 2017-01-09 09:00:10:

    df.orderBy("dt").show(3, False)
    
    +---------------------+---+
    |dt                   |val|
    +---------------------+---+
    |2017-01-09 09:00:10.0|1  |
    |2017-01-09 09:00:11.0|1  |
    |2017-01-09 09:00:12.0|1  |
    +---------------------+---+
    
  • The first full hour is 2017-01-09 09:00:00.0:

    from pyspark.sql.functions import min as min_, date_format
    (df
       .groupBy()
       .agg(date_format(min_("dt"), "yyyy-MM-dd HH:00:00"))
       .show(1, False))
    
    +-----------------------------------------+
    |date_format(min(dt), yyyy-MM-dd HH:00:00)|
    +-----------------------------------------+
    |2017-01-09 09:00:00                      |
    +-----------------------------------------+
    
  • Therefore the first window will start at 2017-01-09 09:03:00 which is 2017-01-09 09:00:00 + startTime (3 seconds) and end at 2017-01-09 09:08:00 (2017-01-09 09:00:00 + startTime + windowDuration).

    This window is empty (there is no data in range [09:03:00, 09:08:00)).

  • The first (and the second) data point will fall into the next window which is [09:00:07.0, 09:00:12.0) which starts at 2017-01-09 09:00:00 + startTime + 1 * slideDuration.

    win.orderBy("window.start").show(3, False)
    
    +---------------------------------------------+---+
    |window                                       |sum|
    +---------------------------------------------+---+
    |[2017-01-09 09:00:07.0,2017-01-09 09:00:12.0]|2  |
    |[2017-01-09 09:00:11.0,2017-01-09 09:00:16.0]|5  |
    |[2017-01-09 09:00:15.0,2017-01-09 09:00:20.0]|5  |
    +---------------------------------------------+---+
    

    Next windows start 2017-01-09 09:00:00 + startTime + n * slideDuration for n in 1..

like image 41
zero323 Avatar answered Oct 18 '22 09:10

zero323