Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Repartition Dask DataFrame to get even partitions

I have a Dask DataFrames that contains index which is not unique (client_id). Repartitioning and resetting index ends up with very uneven partitions - some contains only a few rows, some thousands. For instance the following code:

for p in range(ddd.npartitions):
    print(len(ddd.get_partition(p)))

prints out something like that:

55
17
5
41
51
1144
4391
75153
138970
197105
409466
415925
486076
306377
543998
395974
530056
374293
237
12
104
52
28

My DataFrame is one-hot encoded and has over 500 columns. Larger partitions don't fit in memory. I wanted to repartition the DataFrame to have partitions even in size. Do you know an efficient way to do this?

EDIT 1

Simple reproduce:

df = pd.DataFrame({'x':np.arange(0,10000),'y':np.arange(0,10000)})
df2 = pd.DataFrame({'x':np.append(np.arange(0,4995),np.arange(5000,10000,1000)),'y2':np.arange(0,10000,2)})
dd_df = dd.from_pandas(df, npartitions=10).set_index('x')
dd_df2= dd.from_pandas(df2, npartitions=5).set_index('x')
new_ddf=dd_df.merge(dd_df2, how='right')
#new_ddf = new_ddf.reset_index().set_index('x')
#new_ddf = new_ddf.repartition(npartitions=2)
new_ddf.divisions
for p in range(new_ddf.npartitions):
    print(len(new_ddf.get_partition(p)))

Note the last partitions (one single element):

1000
1000
1000
1000
995
1
1
1
1
1

Even when we uncomment the commented lines, partitions remain uneven in the size.

Edit II: Walkoround

Simple wlakoround can be achieved by the following code. Is there a more elgant way to do this (more in a Dask way)?

def repartition(ddf, npartitions=None):
    MAX_PART_SIZE = 100*1024

    if npartitions is None:
        npartitions = ddf.npartitions

    one_row_size = sum([dt.itemsize for dt in ddf.dtypes])
    length = len(ddf)

    requested_part_size = length/npartitions*one_row_size
    if requested_part_size <= MAX_PART_SIZE:
        np = npartitions
    else:
        np = length*one_row_size/MAX_PART_SIZE

    chunksize = int(length/np)


    vc = ddf.index.value_counts().to_frame(name='count').compute().sort_index()

    vsum = 0
    divisions = [ddf.divisions[0]]
    for i,v in vc.iterrows():
        vsum+=v['count']
        if vsum > chunksize:
            divisions.append(i)
            vsum = 0
    divisions.append(ddf.divisions[-1])


    return ddf.repartition(divisions=divisions, force=True)
like image 922
Szymon Avatar asked Oct 04 '18 09:10

Szymon


People also ask

How many partitions should I have Dask?

You should aim for partitions that have around 100MB of data each. Additionally, reducing partitions is very helpful just before shuffling, which creates n log(n) tasks relative to the number of partitions. DataFrames with less than 100 partitions are much easier to shuffle than DataFrames with tens of thousands.

How do I partition a Dask Dataframe?

This allows partitionwise slicing of a Dask Dataframe. You can perform normal Numpy-style slicing but now rather than slice elements of the array you slice along partitions so, for example, df. partitions[:5] produces a new Dask Dataframe of the first five partitions.

Is Dask merge faster than pandas?

Datatable is 70% faster than pandas while dask is 500% faster!


1 Answers

You're correct that .repartition won't do the trick since it doesn't handle any of the logic for computing divisions and just tries to combine the existing partitions wherever possible. Here's a solution I came up with for the same problem:

def _rebalance_ddf(ddf):
    """Repartition dask dataframe to ensure that partitions are roughly equal size.

    Assumes `ddf.index` is already sorted.
    """
    if not ddf.known_divisions:  # e.g. for read_parquet(..., infer_divisions=False)
        ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
    index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
    index = np.repeat(index_counts.index, index_counts.values)
    divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
    return ddf.repartition(divisions=divisions)

The internal function sorted_division_locations does what you want already, but it only works on an actual list-like, not a lazy dask.dataframe.Index. This avoids pulling the full index in case there are many duplicates and instead just gets the counts and reconstructs locally from that.

If your dataframe is so large that even the index won't fit in memory then you'd need to do something even more clever.

like image 156
bnaul Avatar answered Oct 10 '22 12:10

bnaul