dask dataframes -time series partitions




I have a timeseries pandas dataframe that I want to partition by month and year. My thought was to get a list of datetimes that would serve as the index but the break doesnt happen at the start 0:00 at the first of the month..

da=dd.from_pandas(df, npartitions=1)

how do I set the index to start at each month? I tried npartitions=len(monthly_partitions) but I realize that is wrong as the it may not partition on the date at start time. how should one ensure it partiitons on the first date of the month?


using da=da.repartition(freq='1M') resampled the data from 10 minutes data to 1 minute data see below

Dask DataFrame Structure:
Open    High    Low Close   Vol OI  VI  
2008-05-04 18:00:00 float64 float64 float64 float64 int64   int64   float64 int32
2008-05-04 18:01:00 ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
2017-12-01 16:49:00 ... ... ... ... ... ... ... ...
2017-12-01 16:50:00 ... ... ... ... ... ... ... ...
Dask Name: repartition-merge, 10074101 tasks


Here is the code to reproduce the problem

import pandas as pd
import datetime as dt
import dask as dsk
import numpy as np
import dask.dataframe as dd

ts=pd.date_range("2015-01-01 00:00", " 2015-05-01 23:50", freq="10min")
df = pd.DataFrame(np.random.randint(0,100,size=(len(ts),4)), columns=list('ABCD'), index=ts)
1 Answers

Assuming your dataframe is already indexed by time you should be able to use the repartition method to accomplish this.

df = df.repartition(freq='1M')

Edit after MCVE above

(thanks for adding the minimal and complete example!)

Interesting, this looks like a bug, either in pandas or dask. I assumed that '1M' would mean one month, (as it does in pd.date_range)

In [12]: pd.date_range('2017-01-01', '2017-12-15', freq='1M')
DatetimeIndex(['2017-01-31', '2017-02-28', '2017-03-31', '2017-04-30',
               '2017-05-31', '2017-06-30', '2017-07-31', '2017-08-31',
               '2017-09-30', '2017-10-31', '2017-11-30'],
              dtype='datetime64[ns]', freq='M')

And yet, when passed to pd.Timedelta, it means one minute

In [13]: pd.Timedelta('1M')
Out[13]: Timedelta('0 days 00:01:00')

In [14]: pd.Timedelta('1m')
Out[14]: Timedelta('0 days 00:01:00')

So it's hanging because it's trying to make around 43200 more partitions than you intended :)

We should file a bug report for this (do you have any interest in doing this?). A short term workaround would be to specify divisions yourself explicitly.

In [17]: divisions = pd.date_range('2015-01-01', '2015-05-01', freq='1M').tolist
    ...: ()
    ...: divisions[0] = ddf.divisions[0]
    ...: divisions[-1] = ddf.divisions[-1]
    ...: ddf.repartition(divisions=divisions)
Dask DataFrame Structure:
                         A      B      C      D
2015-01-01 00:00:00  int64  int64  int64  int64
2015-02-28 00:00:00    ...    ...    ...    ...
2015-03-31 00:00:00    ...    ...    ...    ...
2015-05-01 23:50:00    ...    ...    ...    ...
Dask Name: repartition-merge, 7 tasks
