I have parquet file that is 800K rows x 8.7K columns. I loaded it into a dask dataframe:
import dask.dataframe as dd
dask_train_df = dd.read_parquet('train.parquet')
dask_train_df.info()
This yields:
<class 'dask.dataframe.core.DataFrame'>
Columns: 8712 entries, 0 to 8711
dtypes: int8(8712)
When I try to do simple operations like dask_train_df.head()
or dask_train_df.loc[2:4].compute()
, I get memory errors, even with 17+ GB of RAM.
However, if I do:
import pandas as pd
train = pd.read_parquet('../input/train.parquet')
train.info()
yields:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 800000 entries, 0 to 799999
Columns: 8712 entries, 0 to 8711
dtypes: int8(8712)
memory usage: 6.5 GB
and I can run train.head()
and train.loc[2:4]
with no problems since everything is in memory already.
1) So my question is why do these simple operations blow up the memory usage using a Dask Dataframe, but works fine with when I load everything into memory using a Pandas Dataframe?
I notice that npartitions=1
, and I see that in the documentation that read_parquet
"reads a directory of Parquet data into a Dask.dataframe, one file per partition". In my case, it sounds like I'm losing out on all of the parallelization power of having multiple partitions, but then shouldn't the Dask Dataframe memory usage be capped by the amount of memory of the single Pandas Dataframe?
2) Also, a side question: If I wanted to parallelize this single parquet file by partitioning it in a Dask Dataframe, how would I do so? I don't see a blocksize parameter in the dd.read_parquet
signature. I also tried using the repartition function, but I believe that partitions along the rows and in a parquet file, I would want to partition along the columns?
You can read and write Parquet files to Dask DataFrames with the fastparquet and pyarrow engines. Both engines work fine most of the time.
Let's start with the simplest operation — read a single CSV file. To my surprise, we can already see a huge difference in the most basic operation. Datatable is 70% faster than pandas while dask is 500% faster!
You should aim for partitions that have around 100MB of data each. Additionally, reducing partitions is very helpful just before shuffling, which creates n log(n) tasks relative to the number of partitions. DataFrames with less than 100 partitions are much easier to shuffle than DataFrames with tens of thousands.
First, I would like to comment that 8712 columns is rather many, and you will find that parsing the schema/metadata may take significant time, never mind the data loading.
When fastparquet loads data, it first allocates a dataframe of sufficient size, then iterates through the columns/chunks (with appropriate overheads, which apparently are small in this case) and assigns values into the allocated dataframe.
When you run a calculation through Dask (any calculation), there can in many cases be intra-task copies in memory of the input variables and other intermediate objects. That is usually not an issue, as the whole data-set should be split into many parts, and the small intermediates' memory overhead is a price worth paying for being able to handle datasets larger than memory. I am not sure at which point you are getting a copy, it may be worth investigating and preventing.
In your case, the whole data-set is a single partition. This will result in a single load task, running in one thread. You will not be getting any parallelism, and any intermediate internal copies apply to the whole dataset. You could load only part of the data by selecting columns, and so manufacture partitions and achieve parallelism that way. However, the typical way to handle parquet data is to make use of "row-group" partitions (i.e., along the index) and multiple files, so the real way to avoid the problem is to use data which is already appropriately partitioned.
Note that since you can load the data directly with fastparquet/pandas, you could probably also save a partitioned version either with the to_parquet
method or fastparquet's write function.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With