Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Remove empty partitions in Dask

Tags:

python

dask

When loading data from CSV some CSVs cannot be loaded, resulting in an empty partition. I would like to remove all empty partitions, as some methods seem to not work well with empty partitions. I have tried to repartition, where (for example) repartition(npartitions=10) works, but a value greater than this can still result in empty partitions.

What's the best way of achieving this? Thanks.

like image 346
morganics Avatar asked Dec 14 '17 11:12

morganics


2 Answers

I've found that filtering a Dask dataframe, e.g., by date, often results in empty partitions. If you're having trouble using a dataframe with empty partitions, here's a function, based on MRocklin's guidance, to cull them:

def cull_empty_partitions(df):
    ll = list(df.map_partitions(len).compute())
    df_delayed = df.to_delayed()
    df_delayed_new = list()
    pempty = None
    for ix, n in enumerate(ll):
        if 0 == n:
            pempty = df.get_partition(ix)
        else:
            df_delayed_new.append(df_delayed[ix])
    if pempty is not None:
        df = dd.from_delayed(df_delayed_new, meta=pempty)
    return df
like image 158
tpegbert Avatar answered Sep 27 '22 18:09

tpegbert


For anyone working with Bags (not DataFrames), this function will do the trick:

def cull_empty_partitions(bag):
    """
    When bags are created by filtering or grouping from a different bag,
    it retains the original bag's partition count, even if a lot of the
    partitions become empty.
    Those extra partitions add overhead, so it's nice to discard them.
    This function drops the empty partitions.
    """
    bag = bag.persist()
    def get_len(partition):
        # If the bag is the result of bag.filter(),
        # then each partition is actually a 'filter' object,
        # which has no __len__.
        # In that case, we must convert it to a list first.
        if hasattr(partition, '__len__'):
            return len(partition)
        return len(list(partition))
    partition_lengths = bag.map_partitions(get_len).compute()

    # Convert bag partitions into a list of 'delayed' objects
    lengths_and_partitions = zip(partition_lengths, bag.to_delayed())

    # Drop the ones with empty partitions
    partitions = (p for l,p in lengths_and_partitions if l > 0)

    # Convert from list of delayed objects back into a Bag.
    return dask.bag.from_delayed(partitions)
like image 22
Stuart Berg Avatar answered Sep 27 '22 18:09

Stuart Berg