I have recently begun looking at Dask for big data. I have a question on efficiently applying operations in parallel.
Say I have some sales data like this:
customerKey productKey transactionKey grossSales netSales unitVolume volume transactionDate ----------- -------------- ---------------- ---------- -------- ---------- ------ -------------------- 20353 189 219548 0.921058 0.921058 1 1 2017-02-01 00:00:00 2596618 189 215015 0.709997 0.709997 1 1 2017-02-01 00:00:00 30339435 189 215184 0.918068 0.918068 1 1 2017-02-01 00:00:00 32714675 189 216656 0.751007 0.751007 1 1 2017-02-01 00:00:00 39232537 189 218180 0.752392 0.752392 1 1 2017-02-01 00:00:00 41722826 189 216806 0.0160143 0.0160143 1 1 2017-02-01 00:00:00 46525123 189 219875 0.469437 0.469437 1 1 2017-02-01 00:00:00 51024667 189 215457 0.244886 0.244886 1 1 2017-02-01 00:00:00 52949803 189 215413 0.837739 0.837739 1 1 2017-02-01 00:00:00 56526281 189 220261 0.464716 0.464716 1 1 2017-02-01 00:00:00 56776211 189 220017 0.272027 0.272027 1 1 2017-02-01 00:00:00 58198475 189 215058 0.805758 0.805758 1 1 2017-02-01 00:00:00 63523098 189 214821 0.479798 0.479798 1 1 2017-02-01 00:00:00 65987889 189 217484 0.122769 0.122769 1 1 2017-02-01 00:00:00 74607556 189 220286 0.564133 0.564133 1 1 2017-02-01 00:00:00 75533379 189 217880 0.164387 0.164387 1 1 2017-02-01 00:00:00 85676779 189 215150 0.0180961 0.0180961 1 1 2017-02-01 00:00:00 88072944 189 219071 0.492753 0.492753 1 1 2017-02-01 00:00:00 90233554 189 216118 0.439582 0.439582 1 1 2017-02-01 00:00:00 91949008 189 220178 0.1893 0.1893 1 1 2017-02-01 00:00:00 91995925 189 215159 0.566552 0.566552 1 1 2017-02-01 00:00:00
I want to do a few different groupbys, first a groupby-apply on customerKey. Then another groupby-sum on customerKey, and a column which will be the result of the previos groupby apply.
The most efficient way I can think of doing this would be do split this dataframe into partitions of chunks of customer keys. So, for example I could split the dataframe into 4 chunks with a partition scheme for example like (pseudocode)
partition by customerKey % 4
Then i could use map_partitions to do these group by applies for each partition, then finally returning the result. However it seems dask forces me to do a shuffle for each groupby I want to do.
Is there no way to repartition based on the value of a column?
At the moment this takes ~45s with 4 workers on a dataframe of only ~80,000 rows. I am planning to scale this up to a dataframe of trillions of rows, and already this seems like it is going to scale horribly.
Am I missing something fundamental to Dask?
DataFrame¶ A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. These Pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster.
The most efficient way I can think of doing this would be do split this dataframe into partitions of chunks of customer keys. So, for example I could split the dataframe into 4 chunks with a partition scheme for example like (pseudocode)
The Pandas API is very large. Dask DataFrame does not attempt to implement many Pandas features or any of the more exotic data structures like NDFrames Operations that were slow on Pandas, like iterating through row-by-row, remain slow on Dask DataFrame See DataFrame API documentationfor a more extensive list. Execution¶
Set index: df.set_index(df.x) groupby-apply not on index (with anything): df.groupby(df.x).apply(myfunc) Join not on the index: dd.merge(df1,df2,on='name') However, Dask DataFrame does not implement the entire Pandas interface.
You can set your column to be the index
df = df.set_index('customerKey')
This will sort your data by that column and track which ranges of values are in which partition. As you note this is likely to be an expensive operation, you you'll probably want to save it somewhere
Either in memory
df = df.persist()
or on disk
df.to_parquet('...')
df = df.read_parquet('...')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With