Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ValueError: Not all divisions are known, can't align partitions error on dask dataframe

I have the following pandas dataframe with the following columns

user_id user_agent_id requests

All columns contain integers. I wan't to perform some operations on them and run them using dask dataframe. This is what I do.

user_profile = cache_records_dataframe[['user_id', 'user_agent_id', 'requests']] \
    .groupby(['user_id', 'user_agent_id']) \
    .size().to_frame(name='appearances') \
    .reset_index() # I am not sure I can run this on dask dataframe

user_profile_ddf = df.from_pandas(user_profile, npartitions=4)
user_profile_ddf['percent'] = user_profile_ddf.groupby('user_id')['appearances'] \
    .apply(lambda x: x / x.sum(), meta=float) #Percentage of appearance for each user group

But I get the following error

raise ValueError("Not all divisions are known, can't align "
ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.

Am I doing something wrong? In pure pandas it works great but it gets slow for many lines (although they fit in memory) so I want to parallelize the computations.

like image 909
Apostolos Avatar asked Jul 11 '17 09:07

Apostolos


People also ask

How do I partition a Dask DataFrame?

This allows partitionwise slicing of a Dask Dataframe. You can perform normal Numpy-style slicing but now rather than slice elements of the array you slice along partitions so, for example, df. partitions[:5] produces a new Dask Dataframe of the first five partitions.

How many partitions should I have Dask?

You should aim for partitions that have around 100MB of data each. Additionally, reducing partitions is very helpful just before shuffling, which creates n log(n) tasks relative to the number of partitions. DataFrames with less than 100 partitions are much easier to shuffle than DataFrames with tens of thousands.

What is Dask division?

The Dask client has its own version of an index for the distributed DataFrame as a whole, called divisions . divisions is like an index for the indexes—it tracks which partition will contain a given value (just like pandas's index tracks which row will contain a given value).


1 Answers

When creating the dask dataframe add the reset_index():

user_profile_ddf = df.from_pandas(user_profile, npartitions=4).reset_index()
like image 134
skibee Avatar answered Sep 20 '22 16:09

skibee