Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Slow Dask performance on CSV date parsing?

I've been doing a lot of text processing on a big pile of files including large CSVs and lots and lots of little XML files. Sometimes I'm doing aggregate counts but a lot of times I'm doing NLP-type work to do deeper looks at what is in these files beyond what is tagged or already structured.

I've been using the multiprocessing library a lot to perform these calculations across multiple CPUs but I've fallen love with the ideas behind Dask and it comes highly recommended both on the net and by coworkers.

I asked a similar question about Dask performance here:

Slow Performance with Python Dask bag?

and MRocklin (https://stackoverflow.com/users/616616/mrocklin) let me know that loading lots of small files likely trashed the performance.

Yet when I run it on single largeish files (200mb), I still don't get it to perform very well. Here's an example:

I have a 900,000ish row CSV file of tweets and I want to load it quickly and parse the "created_at" field. Here are three ways I've done it and the benchmarks for each of them. I ran this on a new i7 2016 MacBook Pro with 16GB of ram.

import pandas
import dask.dataframe as dd
import multiprocessing

%%time
# Single Threaded, no chunking
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", parse_dates = ["created_at"])
print(len(d))

CPU times: user 2min 31s, sys: 807 ms, total: 2min 32s Wall time: 2min 32s

%%time
# Multithreaded chunking
def parse_frame_dates(frame):
    frame["created_at"] = pandas.to_datetime(frame["created_at"])
    return(frame)

d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", chunksize = 100000)
frames = multiprocessing.Pool().imap_unordered(get_count, d)
td = pandas.concat(frames)
print(len(td))

CPU times: user 5.65 s, sys: 1.47 s, total: 7.12 s Wall time: 1min 10s

%%time
# Dask Load
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
                 parse_dates = ["created_at"], blocksize = 10000000).compute()

CPU times: user 2min 59s, sys: 26.2 s, total: 3min 25s Wall time: 3min 12s

I've found these sorts of results on many different Dask comparisons but even getting this to work right might point me in the right direction.

In short, how can I get the best performance out of Dask for these sorts of tasks? Why does it seem to underperform both single-threaded and multi-threaded techniques done other ways?

like image 692
Mike Shea Avatar asked Oct 17 '22 18:10

Mike Shea


1 Answers

I suspect that the Pandas read_csv datetime parsing code is pure-python, and so won't benefit much from using threads, which is what dask.dataframe uses by default.

You might see better performance when using processes.

I suspect that the following would work faster:

import dask.multiprocessing
dask.set_options(get=dask.multiprocessing.get)  # set processes as default

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
                parse_dates = ["created_at"], blocksize = 10000000)
len(d)

The problem with processes is that inter-process communication can become expensive. I'm explicitly computing len(d) above rather than d.compute() in order to avoid having to pickle up all of the pandas dataframes in the worker processes and move them to the main calling process. In practice this is pretty common anyway, as people rarely want the full dataframe, but some computation on the dataframe.

The relevant docpage here is http://dask.readthedocs.io/en/latest/scheduler-choice.html

You might also want to use the distributed scheduler on a single machine rather than use the multiprocessing scheduler. This is also described in the docs referenced above.

$ pip install dask distributed

from dask.distributed import Client
c = Client()  # create processes and set as default

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
                parse_dates = ["created_at"], blocksize = 10000000)
len(d)
like image 57
MRocklin Avatar answered Oct 20 '22 11:10

MRocklin