I have a timeseries pandas dataframe that I want to partition by month and year. My thought was to get a list of datetimes that would serve as the index but the break doesnt happen at the start 0:00 at the first of the month..
monthly_partitons=np.unique(df.index.values.astype('datetime64[M]')).tolist()
da=dd.from_pandas(df, npartitions=1)
how do I set the index to start at each month? I tried npartitions=len(monthly_partitions)
but I realize that is wrong as the it may not partition on the date at start time. how should one ensure it partiitons on the first date of the month?
UPDATE:
using da=da.repartition(freq='1M')
resampled the data from 10 minutes data to 1 minute data see below
Dask DataFrame Structure:
Open High Low Close Vol OI VI
npartitions=5037050
2008-05-04 18:00:00 float64 float64 float64 float64 int64 int64 float64 int32
2008-05-04 18:01:00 ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
2017-12-01 16:49:00 ... ... ... ... ... ... ... ...
2017-12-01 16:50:00 ... ... ... ... ... ... ... ...
Dask Name: repartition-merge, 10074101 tasks
UPDATE 2:
Here is the code to reproduce the problem
import pandas as pd
import datetime as dt
import dask as dsk
import numpy as np
import dask.dataframe as dd
ts=pd.date_range("2015-01-01 00:00", " 2015-05-01 23:50", freq="10min")
df = pd.DataFrame(np.random.randint(0,100,size=(len(ts),4)), columns=list('ABCD'), index=ts)
ddf=dd.from_pandas(df,npartitions=1)
ddf=ddf.repartition(freq='1M')
ddf
Assuming your dataframe is already indexed by time you should be able to use the repartition method to accomplish this.
df = df.repartition(freq='1M')
(thanks for adding the minimal and complete example!)
Interesting, this looks like a bug, either in pandas or dask. I assumed that '1M'
would mean one month, (as it does in pd.date_range
)
In [12]: pd.date_range('2017-01-01', '2017-12-15', freq='1M')
Out[12]:
DatetimeIndex(['2017-01-31', '2017-02-28', '2017-03-31', '2017-04-30',
'2017-05-31', '2017-06-30', '2017-07-31', '2017-08-31',
'2017-09-30', '2017-10-31', '2017-11-30'],
dtype='datetime64[ns]', freq='M')
And yet, when passed to pd.Timedelta
, it means one minute
In [13]: pd.Timedelta('1M')
Out[13]: Timedelta('0 days 00:01:00')
In [14]: pd.Timedelta('1m')
Out[14]: Timedelta('0 days 00:01:00')
So it's hanging because it's trying to make around 43200 more partitions than you intended :)
We should file a bug report for this (do you have any interest in doing this?). A short term workaround would be to specify divisions yourself explicitly.
In [17]: divisions = pd.date_range('2015-01-01', '2015-05-01', freq='1M').tolist
...: ()
...: divisions[0] = ddf.divisions[0]
...: divisions[-1] = ddf.divisions[-1]
...: ddf.repartition(divisions=divisions)
...:
Out[17]:
Dask DataFrame Structure:
A B C D
npartitions=3
2015-01-01 00:00:00 int64 int64 int64 int64
2015-02-28 00:00:00 ... ... ... ...
2015-03-31 00:00:00 ... ... ... ...
2015-05-01 23:50:00 ... ... ... ...
Dask Name: repartition-merge, 7 tasks
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With