Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is computing the shape on an indexed Parquet file so slow in dask?

I have created a Parquet file from multiple Parquet files located in the same folder. Each file corresponds to a partition.

Parquet files are created in different processes (using Python concurrent.futures). Here is an example of the code I run in one process:

# `df` is a standard Pandas DataFrame with
# 22 columns of different types and at most 100e3 rows.

# Set the index
df.set_index("cid", inplace=True)

# Write to single file
fastparquet.write(fpath, df, compression='snappy, file_scheme='simple)

df contains at most 100e3 rows (and 22 columns) and is indexed on an integer index (called cid).

Then I have created the two metadata files using:

# `data_paths` contains the list of all the Parquet data files
# created in multiple processes.
fastparquet.writer.merge(data_paths, verify_schema=True)

and indeed _metadata and _common_metadata are correctly created in the folder containing all the Parquet files.

I naively thought that because the data are indexed and/or it has metadata files, getting basic information such as the size of the data should be fast. For example, the following takes forever:

import dask.dataframe as ds

# `dataset_path` is the path to the folder
# containing all the Parquet files created above
# and the metadata files.
# It contains ~100-200 individual Parquet files
# for a total of ~60,000,000 rows
data = df.read_parquet(dataset_path)
data.shape[0].compute()

Is that excepted?

Also note that most of the columns are int64, float64 and few of them are object (string of different sizes.

like image 249
hadim Avatar asked Jan 20 '26 03:01

hadim


2 Answers

Unfortunately, the optimisation to extract the length of the dataframe from the metadata doesn't yet exist. Instead, dask is loading every partition into memory and measuring its length. You will notice that this happens faster if you would select a single column (or index):

len(data[onecolumn])

However, you are quite right that for the special case of parquet, the length is known beforehand from one or more sets of metadata, and it would be nice to be able to get it in one go. Please feel free to request this feature on the Dask issue tracker. For now, you can use the count and columns attributes of fastparquet.ParquetFile.

like image 197
mdurant Avatar answered Jan 23 '26 19:01

mdurant


Personally, I access the Parquet metadata using fastparquet directly, not Dask.

There's a lot of data in the metadata, so it's worth playing around with.

I further note that you can put the fastparquet operations in a delayed function to read the parquet metadata in parallel using dask if you have a lot of files. As an example:

@dask.delayed
def read_pf(path_to_parquet_file):
   pf = fastparquet.ParquetFile(path_to_parquet_file)
   all_stats = pf.statistics.copy()
   all_info = pf.info.copy()
like image 45
dan Avatar answered Jan 23 '26 19:01

dan