Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to execute a multi-threaded `merge()` with dask? How to use multiples cores via qsub?

I've just begun using dask, and I'm still fundamentally confused how to do simple pandas tasks with multiple threads, or using a cluster.

Let's take pandas.merge() with dask dataframes.

import dask.dataframe as dd

df1 = dd.read_csv("file1.csv")
df2 = dd.read_csv("file2.csv")

df3 = dd.merge(df1, df2)

Now, let's say I were to run this on my laptop, with 4 cores. How do I assign 4 threads to this task?

It appears the correct way to do this is:

dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute()

And this will use as many threads exist (i.e. as many cores with shared memory on your laptop exist, 4)? How do I set the number of threads?

Let's say I am at a facility with 100 cores. How do I submit this in the same manner as one would submit jobs to the cluster with qsub? (Similar to running tasks on clusters via MPI?)

dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute
like image 498
ShanZhengYang Avatar asked Oct 14 '16 22:10

ShanZhengYang


1 Answers

Single machine scheduling

Dask.dataframe will use the threaded scheduler by default with as many threads as you have logical cores in your machine.

As pointed out in the comments, you can control the number of threads or the Pool implementation with keyword arguments to the .compute() method.

Distributed machine scheduling

You can use dask.distributed to deploy dask workers across many nodes in a cluster. One way to do this with qsub is to start a dask-scheduler locally:

$ dask-scheduler
Scheduler started at 192.168.1.100:8786

And then use qsub to launch many dask-worker processes, pointed at the reported address:

$ qsub dask-worker 192.168.1.100:8786 ... <various options>

As of yesterday there is an experimental package that can do this on any DRMAA-enabled system (which includes SGE/qsub-like systems): https://github.com/dask/dask-drmaa

After you have done this you can create a dask.distributed.Client object, which will take over as default scheduler

from dask.distributed import Client
c = Client('192.168.1.100:8786')  # now computations run by default on the cluster

Multi-threaded performance

Note that as of Pandas version 0.19 the GIL still isn't released for pd.merge, so I wouldn't expect a huge speed boost from using multiple threads. If this is important to you then I recommend putting in a comment here: https://github.com/pandas-dev/pandas/issues/13745

like image 103
MRocklin Avatar answered Sep 25 '22 18:09

MRocklin