Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can dask parralelize reading fom a csv file?

I'm converting a large textfile to a hdf storage in hopes of a faster data access. The conversion works allright, however reading from the csv file is not done in parallel. It is really slow (takes about 30min for a 1GB textfile on an SSD, so my guess is that it is not IO-bound).

Is there a way to have it read in multiple threads in parralel? Sice it might be important, I'm currently forced to run under Windows -- just in case that makes any difference.

from dask import dataframe as ddf df = ddf.read_csv("data/Measurements*.csv",              sep=';',               parse_dates=["DATETIME"],               blocksize=1000000,              )  df.categorize([ 'Type',                 'Condition',                          ])  df.to_hdf("data/data.hdf", "Measurements", 'w') 
like image 309
Magellan88 Avatar asked Oct 18 '16 05:10

Magellan88


People also ask

Can Dask read parquet?

You can read and write Parquet files to Dask DataFrames with the fastparquet and pyarrow engines. Both engines work fine most of the time.

How is Dask different from pandas?

Dask runs faster than pandas for this query, even when the most inefficient column type is used, because it parallelizes the computations. pandas only uses 1 CPU core to run the query. My computer has 4 cores and Dask uses all the cores to run the computation.

Should I use Dask or pandas?

If you love Pandas and Numpy but were sometimes struggling with data that would not fit into RAM then Dask is definitely what you need. Dask supports the Pandas dataframe and Numpy array data structures and is able to either be run on your local computer or be scaled up to run on a cluster.


2 Answers

Piggybacking off of @MRocklin's answer, in newer versions of dask, you can use df.compute(scheduler='processes') or df.compute(scheduler='threads') to convert to pandas using multiprocessing or multithreading:

from dask import dataframe as ddf df = ddf.read_csv("data/Measurements*.csv",              sep=';',               parse_dates=["DATETIME"],               blocksize=1000000,              )  df = df.compute(scheduler='processes')     # convert to pandas  df['Type'] = df['Type'].astype('category') df['Condition'] = df['Condition'].astype('category')  df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w') 
like image 23
mgoldwasser Avatar answered Sep 23 '22 02:09

mgoldwasser


Yes, dask.dataframe can read in parallel. However you're running into two problems:

Pandas.read_csv only partially releases the GIL

By default dask.dataframe parallelizes with threads because most of Pandas can run in parallel in multiple threads (releases the GIL). Pandas.read_csv is an exception, especially if your resulting dataframes use object dtypes for text

dask.dataframe.to_hdf(filename) forces sequential computation

Writing to a single HDF file will force sequential computation (it's very hard to write to a single file in parallel.)

Edit: New solution

Today I would avoid HDF and use Parquet instead. I would probably use the multiprocessing or dask.distributed schedulers to avoid GIL issues on a single machine. The combination of these two should give you full linear scaling.

from dask.distributed import Client client = Client()  df = dask.dataframe.read_csv(...) df.to_parquet(...) 

Solution

Because your dataset likely fits in memory, use dask.dataframe.read_csv to load in parallel with multiple processes, then switch immediately to Pandas.

import dask.dataframe as ddf import dask.multiprocessing  df = ddf.read_csv("data/Measurements*.csv",  # read in parallel              sep=';',               parse_dates=["DATETIME"],               blocksize=1000000,              )  df = df.compute(get=dask.multiprocessing.get)     # convert to pandas  df['Type'] = df['Type'].astype('category') df['Condition'] = df['Condition'].astype('category')  df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w') 
like image 88
MRocklin Avatar answered Sep 22 '22 02:09

MRocklin