I have a dataframe made up of 100,000+ rows and each row has 100,000 columns, totally to 10,000,000,000 float values.
I've managed to read them in previously in a csv
(tab-separated) file and I successfully read them to a 50 cores Xeon machine with 250GB RAM and try to write it out as a .parq
directory as such:
The floats in huge.csv
were saved as strings and it is 125GB.
import dask.dataframe as dd
filename = 'huge.csv'
df = dd.read_csv(filename, delimiter='\t', sample=500000000)
df.to_parquet('huge.parq')
It has been writing to the huge.parq
for close to a week and the directory is 14GB and it seems like the process of saving .to_parquet
is not going to stop any time soon.
And free -mh
is showing that there's still memory left available but the time it's taking to save the .parq
directory is tremendously slow:
$ free -mh
total used free shared buff/cache available
Mem: 251G 98G 52G 10M 101G 152G
Swap: 238G 0B 238G
The questions are:
Given the size of the dataframe and machine, is it feasible to save the dask dataframe to a parquet file at all?
Is it normal for dask
and fastparquet
to take so long to save huge dataframes?
Is there some way to estimate the time it will take to save a parquet file?
You can read and write Parquet files to Dask DataFrames with the fastparquet and pyarrow engines. Both engines work fine most of the time.
That query runs in 122 seconds. Dask runs faster than pandas for this query, even when the most inefficient column type is used, because it parallelizes the computations. pandas only uses 1 CPU core to run the query. My computer has 4 cores and Dask uses all the cores to run the computation.
fastparquet is a python implementation of the parquet format, aiming integrate into python-based big data work-flows. It is used implicitly by the projects Dask, Pandas and intake-parquet.
As discussed in the comments above, there is no theoretical reason that .to_parquet()
should not cope with your data. However, the number of columns is extremely large, and because there is an overhead associated with each, it is not surprising that the process is taking a long time - this is not the typical use case.
It sounds like your data is best thought of as an array rather than a table. There are array storage mechanisms that allow you to chunk in every dimension, for instance zarr, which also allows for various compression and pre-filtering operations that can make efficient use of disc space. (Other formats like HDF5 are also popular for a task like this)
An example of how to store a 10k X 10k array:
import dask.array as da
import zarr
arr = da.random.random(size=(10000, 10000), chunks=(1000, 1000))
z = zarr.open_array('z.zarr', shape=(10000, 10000), chunks=(1000, 1000), mode='w', dtype='float64')
arr.store(z)
and now z.zarr/ contains 100 data-file chunks.
In your case, the tricky part is reading the data in, since you don't know a priori the number of rows. You could use
df = dataframe.read_csv(..)
len(df) # get length
z = zarr.open_arr(...) # provide dtype, size and chunk appropriately
df.values.store(z)
or it may be more efficient to wrap np.loadtxt
with dask.delayed
to forgo the dataframe stage.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With