I'm converting a large textfile to a hdf storage in hopes of a faster data access. The conversion works allright, however reading from the csv file is not done in parallel. It is really slow (takes about 30min for a 1GB textfile on an SSD, so my guess is that it is not IO-bound).
Is there a way to have it read in multiple threads in parralel? Sice it might be important, I'm currently forced to run under Windows -- just in case that makes any difference.
from dask import dataframe as ddf df = ddf.read_csv("data/Measurements*.csv", sep=';', parse_dates=["DATETIME"], blocksize=1000000, ) df.categorize([ 'Type', 'Condition', ]) df.to_hdf("data/data.hdf", "Measurements", 'w')
You can read and write Parquet files to Dask DataFrames with the fastparquet and pyarrow engines. Both engines work fine most of the time.
Dask runs faster than pandas for this query, even when the most inefficient column type is used, because it parallelizes the computations. pandas only uses 1 CPU core to run the query. My computer has 4 cores and Dask uses all the cores to run the computation.
If you love Pandas and Numpy but were sometimes struggling with data that would not fit into RAM then Dask is definitely what you need. Dask supports the Pandas dataframe and Numpy array data structures and is able to either be run on your local computer or be scaled up to run on a cluster.
Piggybacking off of @MRocklin's answer, in newer versions of dask, you can use df.compute(scheduler='processes')
or df.compute(scheduler='threads')
to convert to pandas using multiprocessing or multithreading:
from dask import dataframe as ddf df = ddf.read_csv("data/Measurements*.csv", sep=';', parse_dates=["DATETIME"], blocksize=1000000, ) df = df.compute(scheduler='processes') # convert to pandas df['Type'] = df['Type'].astype('category') df['Condition'] = df['Condition'].astype('category') df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')
Yes, dask.dataframe can read in parallel. However you're running into two problems:
By default dask.dataframe parallelizes with threads because most of Pandas can run in parallel in multiple threads (releases the GIL). Pandas.read_csv is an exception, especially if your resulting dataframes use object dtypes for text
Writing to a single HDF file will force sequential computation (it's very hard to write to a single file in parallel.)
Today I would avoid HDF and use Parquet instead. I would probably use the multiprocessing or dask.distributed schedulers to avoid GIL issues on a single machine. The combination of these two should give you full linear scaling.
from dask.distributed import Client client = Client() df = dask.dataframe.read_csv(...) df.to_parquet(...)
Because your dataset likely fits in memory, use dask.dataframe.read_csv to load in parallel with multiple processes, then switch immediately to Pandas.
import dask.dataframe as ddf import dask.multiprocessing df = ddf.read_csv("data/Measurements*.csv", # read in parallel sep=';', parse_dates=["DATETIME"], blocksize=1000000, ) df = df.compute(get=dask.multiprocessing.get) # convert to pandas df['Type'] = df['Type'].astype('category') df['Condition'] = df['Condition'].astype('category') df.to_hdf('data/data.hdf', 'Measurements', format='table', mode='w')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With