Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dask dataframe split partitions based on a column or function

I have recently begun looking at Dask for big data. I have a question on efficiently applying operations in parallel.

Say I have some sales data like this:

customerKey    productKey    transactionKey    grossSales  netSales      unitVolume    volume transactionDate
-----------  --------------  ----------------  ----------  --------      ----------    ------ --------------------
    20353           189            219548     0.921058     0.921058              1         1  2017-02-01 00:00:00
  2596618           189            215015     0.709997     0.709997              1         1  2017-02-01 00:00:00
 30339435           189            215184     0.918068     0.918068              1         1  2017-02-01 00:00:00
 32714675           189            216656     0.751007     0.751007              1         1  2017-02-01 00:00:00
 39232537           189            218180     0.752392     0.752392              1         1  2017-02-01 00:00:00
 41722826           189            216806     0.0160143    0.0160143             1         1  2017-02-01 00:00:00
 46525123           189            219875     0.469437     0.469437              1         1  2017-02-01 00:00:00
 51024667           189            215457     0.244886     0.244886              1         1  2017-02-01 00:00:00
 52949803           189            215413     0.837739     0.837739              1         1  2017-02-01 00:00:00
 56526281           189            220261     0.464716     0.464716              1         1  2017-02-01 00:00:00
 56776211           189            220017     0.272027     0.272027              1         1  2017-02-01 00:00:00
 58198475           189            215058     0.805758     0.805758              1         1  2017-02-01 00:00:00
 63523098           189            214821     0.479798     0.479798              1         1  2017-02-01 00:00:00
 65987889           189            217484     0.122769     0.122769              1         1  2017-02-01 00:00:00
 74607556           189            220286     0.564133     0.564133              1         1  2017-02-01 00:00:00
 75533379           189            217880     0.164387     0.164387              1         1  2017-02-01 00:00:00
 85676779           189            215150     0.0180961    0.0180961             1         1  2017-02-01 00:00:00
 88072944           189            219071     0.492753     0.492753              1         1  2017-02-01 00:00:00
 90233554           189            216118     0.439582     0.439582              1         1  2017-02-01 00:00:00
 91949008           189            220178     0.1893       0.1893                1         1  2017-02-01 00:00:00
 91995925           189            215159     0.566552     0.566552              1         1  2017-02-01 00:00:00

I want to do a few different groupbys, first a groupby-apply on customerKey. Then another groupby-sum on customerKey, and a column which will be the result of the previos groupby apply.

The most efficient way I can think of doing this would be do split this dataframe into partitions of chunks of customer keys. So, for example I could split the dataframe into 4 chunks with a partition scheme for example like (pseudocode)

partition by customerKey % 4

Then i could use map_partitions to do these group by applies for each partition, then finally returning the result. However it seems dask forces me to do a shuffle for each groupby I want to do.

Is there no way to repartition based on the value of a column?

At the moment this takes ~45s with 4 workers on a dataframe of only ~80,000 rows. I am planning to scale this up to a dataframe of trillions of rows, and already this seems like it is going to scale horribly.

Am I missing something fundamental to Dask?

like image 274
Roger Thomas Avatar asked Mar 28 '18 11:03

Roger Thomas


People also ask

What is DASK Dataframe in pandas?

DataFrame¶ A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. These Pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster.

How can I split a Dataframe into chunks?

The most efficient way I can think of doing this would be do split this dataframe into partitions of chunks of customer keys. So, for example I could split the dataframe into 4 chunks with a partition scheme for example like (pseudocode)

Why is DASK Dataframe so slow?

The Pandas API is very large. Dask DataFrame does not attempt to implement many Pandas features or any of the more exotic data structures like NDFrames Operations that were slow on Pandas, like iterating through row-by-row, remain slow on Dask DataFrame See DataFrame API documentationfor a more extensive list. Execution¶

How to join not on the Index in DASK Dataframe?

Set index: df.set_index(df.x) groupby-apply not on index (with anything): df.groupby(df.x).apply(myfunc) Join not on the index: dd.merge(df1,df2,on='name') However, Dask DataFrame does not implement the entire Pandas interface.


1 Answers

You can set your column to be the index

df = df.set_index('customerKey')

This will sort your data by that column and track which ranges of values are in which partition. As you note this is likely to be an expensive operation, you you'll probably want to save it somewhere

Either in memory

df = df.persist()

or on disk

df.to_parquet('...')
df = df.read_parquet('...')
like image 99
MRocklin Avatar answered Oct 30 '22 14:10

MRocklin