I have several files whose with a column called idx
and I would like to use it as index. The dataframe obtained has about 13M row. I know that I can read and assign index in this way (which is slow ~40 s)
df = dd.read_parquet("file-*.parq")
df = df.set_index("idx")
or in this other way (which is quick ~40 ms)
df = dd.read_parquet("file-*.parq", index = "idx")
A simple operation as calculate the length is ~4x faster with the second method. What I don't understand is
df.known_divisions
returns True
while on the second is False
. I expected the opposite behaviour. I then did several operations on top of df
and the without a known_division I'm always obtaining better performance. I'm scratching my head to figure out if this is happening on purpose or not.UPDATE
It is not just calculating len
which is faster. In my calculation I create 4 new dataframes using groupby, apply and join several times and these are the timings
| |Load and reindex (s)|Load with index (s)|
|:-----------------|-------------------:|------------------:|
| load | 12.5000 | 0.0124 |
| grp, apply, join | 11.4000 | 6.2700 |
| compute() | 146.0000 | 125.0000 |
| TOTAL | 169.9000 | 131.2820 |
Using dask instead of pandas to merge large data sets The python package dask is a powerful python package that allows you to do data analytics in parallel which means it should be faster and more memory efficient than pandas .
You should aim for partitions that have around 100MB of data each. Additionally, reducing partitions is very helpful just before shuffling, which creates n log(n) tasks relative to the number of partitions. DataFrames with less than 100 partitions are much easier to shuffle than DataFrames with tens of thousands.
When the Dask DataFrame contains data that's split across multiple nodes in a cluster, then compute() may run slowly. It can also cause out of memory errors if the data isn't small enough to fit in the memory of a single machine. Dask was created to solve the memory issues of using pandas on a single machine.
When you use the first method, dask is loading the data, and partitioning the rows by the value of your chosen column (which involves shuffling all of the on-disc chunks) before doing any of the calculation you have asked for. In the case of calculating the length, this is all wasted time, as a knowledge of the index divisions doesn't help at all with that, but further computations involving that index (e.g., join operations) would be much faster.
In the second version, you are asserting that your chosen column is the index, but dask does not shuffle the data without you explicitly asking for it. If it happens to have statistics saved in the parquet metadata, and the max/min of each parquet chunk is such, that they form a monotonic series (i.e., all of the values of 'idx' in the second chunk are greater than all of the values in the first, etc.), then you will have known divisions and optimized performance for certain operations involving the index as before. If those conditions are not met, you have the index column set, but the divisions are not known - which, again, is totally fine for calculating the length.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With