Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dask dataframes known_divisions and performance

Tags:

I have several files whose with a column called idx and I would like to use it as index. The dataframe obtained has about 13M row. I know that I can read and assign index in this way (which is slow ~40 s)

df = dd.read_parquet("file-*.parq")
df = df.set_index("idx")

or in this other way (which is quick ~40 ms)

df = dd.read_parquet("file-*.parq", index = "idx")

A simple operation as calculate the length is ~4x faster with the second method. What I don't understand is

  • on the first case df.known_divisions returns True while on the second is False. I expected the opposite behaviour. I then did several operations on top of df and the without a known_division I'm always obtaining better performance. I'm scratching my head to figure out if this is happening on purpose or not.
  • the number of partitions is the number of files. How can I set a different number of partitions?

UPDATE It is not just calculating len which is faster. In my calculation I create 4 new dataframes using groupby, apply and join several times and these are the timings

|                  |Load and reindex (s)|Load with index (s)|
|:-----------------|-------------------:|------------------:|
| load             |            12.5000 |            0.0124 |
| grp, apply, join |            11.4000 |            6.2700 |
| compute()        |           146.0000 |          125.0000 |
| TOTAL            |           169.9000 |          131.2820 |
like image 600
rpanai Avatar asked Aug 07 '17 21:08

rpanai


People also ask

Is Dask merge faster than pandas?

Using dask instead of pandas to merge large data sets The python package dask is a powerful python package that allows you to do data analytics in parallel which means it should be faster and more memory efficient than pandas .

How many partitions should I have Dask?

You should aim for partitions that have around 100MB of data each. Additionally, reducing partitions is very helpful just before shuffling, which creates n log(n) tasks relative to the number of partitions. DataFrames with less than 100 partitions are much easier to shuffle than DataFrames with tens of thousands.

Why is Dask compute slow?

When the Dask DataFrame contains data that's split across multiple nodes in a cluster, then compute() may run slowly. It can also cause out of memory errors if the data isn't small enough to fit in the memory of a single machine. Dask was created to solve the memory issues of using pandas on a single machine.


1 Answers

When you use the first method, dask is loading the data, and partitioning the rows by the value of your chosen column (which involves shuffling all of the on-disc chunks) before doing any of the calculation you have asked for. In the case of calculating the length, this is all wasted time, as a knowledge of the index divisions doesn't help at all with that, but further computations involving that index (e.g., join operations) would be much faster.

In the second version, you are asserting that your chosen column is the index, but dask does not shuffle the data without you explicitly asking for it. If it happens to have statistics saved in the parquet metadata, and the max/min of each parquet chunk is such, that they form a monotonic series (i.e., all of the values of 'idx' in the second chunk are greater than all of the values in the first, etc.), then you will have known divisions and optimized performance for certain operations involving the index as before. If those conditions are not met, you have the index column set, but the divisions are not known - which, again, is totally fine for calculating the length.

like image 162
mdurant Avatar answered Nov 15 '22 05:11

mdurant