Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is saving a HUGE dask dataframe into parquet possible?

I have a dataframe made up of 100,000+ rows and each row has 100,000 columns, totally to 10,000,000,000 float values.

I've managed to read them in previously in a csv (tab-separated) file and I successfully read them to a 50 cores Xeon machine with 250GB RAM and try to write it out as a .parq directory as such:

The floats in huge.csv were saved as strings and it is 125GB.

import dask.dataframe as dd
filename = 'huge.csv'
df = dd.read_csv(filename, delimiter='\t', sample=500000000)
df.to_parquet('huge.parq')

It has been writing to the huge.parq for close to a week and the directory is 14GB and it seems like the process of saving .to_parquet is not going to stop any time soon.

And free -mh is showing that there's still memory left available but the time it's taking to save the .parq directory is tremendously slow:

$ free -mh
              total        used        free      shared  buff/cache   available
Mem:           251G         98G         52G         10M        101G        152G
Swap:          238G          0B        238G

The questions are:

  • Given the size of the dataframe and machine, is it feasible to save the dask dataframe to a parquet file at all?

  • Is it normal for dask and fastparquet to take so long to save huge dataframes?

  • Is there some way to estimate the time it will take to save a parquet file?

like image 799
alvas Avatar asked May 26 '17 06:05

alvas


People also ask

Can DASK read Parquet files?

You can read and write Parquet files to Dask DataFrames with the fastparquet and pyarrow engines. Both engines work fine most of the time.

Why is DASK faster than pandas?

That query runs in 122 seconds. Dask runs faster than pandas for this query, even when the most inefficient column type is used, because it parallelizes the computations. pandas only uses 1 CPU core to run the query. My computer has 4 cores and Dask uses all the cores to run the computation.

What is Fastparquet?

fastparquet is a python implementation of the parquet format, aiming integrate into python-based big data work-flows. It is used implicitly by the projects Dask, Pandas and intake-parquet.


1 Answers

As discussed in the comments above, there is no theoretical reason that .to_parquet() should not cope with your data. However, the number of columns is extremely large, and because there is an overhead associated with each, it is not surprising that the process is taking a long time - this is not the typical use case.

It sounds like your data is best thought of as an array rather than a table. There are array storage mechanisms that allow you to chunk in every dimension, for instance zarr, which also allows for various compression and pre-filtering operations that can make efficient use of disc space. (Other formats like HDF5 are also popular for a task like this)

An example of how to store a 10k X 10k array:

import dask.array as da
import zarr
arr = da.random.random(size=(10000, 10000), chunks=(1000, 1000))
z = zarr.open_array('z.zarr', shape=(10000, 10000), chunks=(1000, 1000), mode='w', dtype='float64')
arr.store(z)

and now z.zarr/ contains 100 data-file chunks.

In your case, the tricky part is reading the data in, since you don't know a priori the number of rows. You could use

df = dataframe.read_csv(..)
len(df)  # get length
z = zarr.open_arr(...)  # provide dtype, size and chunk appropriately
df.values.store(z)

or it may be more efficient to wrap np.loadtxt with dask.delayed to forgo the dataframe stage.

like image 152
mdurant Avatar answered Oct 14 '22 23:10

mdurant