I've just begun using dask, and I'm still fundamentally confused how to do simple pandas tasks with multiple threads, or using a cluster.
Let's take pandas.merge()
with dask
dataframes.
import dask.dataframe as dd
df1 = dd.read_csv("file1.csv")
df2 = dd.read_csv("file2.csv")
df3 = dd.merge(df1, df2)
Now, let's say I were to run this on my laptop, with 4 cores. How do I assign 4 threads to this task?
It appears the correct way to do this is:
dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute()
And this will use as many threads exist (i.e. as many cores with shared memory on your laptop exist, 4)? How do I set the number of threads?
Let's say I am at a facility with 100 cores. How do I submit this in the same manner as one would submit jobs to the cluster with qsub
? (Similar to running tasks on clusters via MPI?)
dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute
Dask.dataframe will use the threaded scheduler by default with as many threads as you have logical cores in your machine.
As pointed out in the comments, you can control the number of threads or the Pool implementation with keyword arguments to the .compute()
method.
You can use dask.distributed to deploy dask workers across many nodes in a cluster. One way to do this with qsub
is to start a dask-scheduler
locally:
$ dask-scheduler
Scheduler started at 192.168.1.100:8786
And then use qsub
to launch many dask-worker
processes, pointed at the reported address:
$ qsub dask-worker 192.168.1.100:8786 ... <various options>
As of yesterday there is an experimental package that can do this on any DRMAA-enabled system (which includes SGE/qsub-like systems): https://github.com/dask/dask-drmaa
After you have done this you can create a dask.distributed.Client
object, which will take over as default scheduler
from dask.distributed import Client
c = Client('192.168.1.100:8786') # now computations run by default on the cluster
Note that as of Pandas version 0.19 the GIL still isn't released for pd.merge
, so I wouldn't expect a huge speed boost from using multiple threads. If this is important to you then I recommend putting in a comment here: https://github.com/pandas-dev/pandas/issues/13745
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With