I have the following dask dataframe created from Castra:
import dask.dataframe as dd
df = dd.from_castra('data.castra', columns=['user_id','ts','text'])
Yielding:
user_id / ts / text
ts
2015-08-08 01:10:00 9235 2015-08-08 01:10:00 a
2015-08-08 02:20:00 2353 2015-08-08 02:20:00 b
2015-08-08 02:20:00 9235 2015-08-08 02:20:00 c
2015-08-08 04:10:00 9235 2015-08-08 04:10:00 d
2015-08-08 08:10:00 2353 2015-08-08 08:10:00 e
What I'm trying to do is:
user_id
and ts
Example output:
text
user_id ts
9235 2015-08-08 00:00:00 ac
2015-08-08 03:00:00 d
2353 2015-08-08 00:00:00 b
2015-08-08 06:00:00 e
I tried the following:
df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()
And got the following error:
TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex
I tried passing set_index('ts')
in the pipe but it doesn't seem to be an attribute of Series
.
Any ideas on how to achieve this?
TL;DR
If it makes the problem easier, I'm also able to change the format of the Castra DB I created too. The implementation I have currently was largely taken from this great post.
I set the index (in the to_df()
function) as follows:
df.set_index('ts',drop=False,inplace=True)
And have:
with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
batches = partition_all(batch_size, f)
df, frames = peek(map(self.to_df, batches))
castra = Castra(S.CASTRA, template=df, categories=categories)
castra.extend_sequence(frames, freq='3h')
Here are the resulting dtypes:
ts datetime64[ns]
text object
user_id float64
If we can assume that each user-id
group can fit in memory then I recommend using dask.dataframe to do the outer-groupby but then using pandas to do the operations within each group, something like the following.
def per_group(blk):
return blk.groupby('ts').text.resample('3H', how='sum')
df.groupby('user_id').apply(per_group, columns=['ts', 'text']).compute()
This decouples two hard things into the two different projects
Ideally dask.dataframe would write the per-group function for you automatically. At the moment dask.dataframe does not intelligently handle multi-indexes, or resampling on top of multi-column groupbys, so the automatic solution isn't yet available. Still, it's quite possible to fall back to pandas for the per-block computation while still using dask.dataframe to prepare the groups accordingly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With