Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to efficiently submit tasks with large arguments in Dask distributed?

Tags:

I want to submit functions with Dask that have large (gigabyte scale) arguments. What is the best way to do this? I want to run this function many times with different (small) parameters.

Example (bad)

This uses the concurrent.futures interface. We could use the dask.delayed interface just as easily.

x = np.random.random(size=100000000)  # 800MB array
params = list(range(100))             # 100 small parameters

def f(x, param):
    pass

from dask.distributed import Client
c = Client()

futures = [c.submit(f, x, param) for param in params]

But this is slower than I would expect or results in memory errors.

like image 411
MRocklin Avatar asked Jan 04 '17 18:01

MRocklin


People also ask

Why does Dask compute take so long?

Dask isn't aware of the shape of your DataFrame. In fact, it just knows the number of “partitions”. So, the number of elements in each partition are calculated when you call . compute() , which does take some time.

Can Dask run out of memory?

TL;DR: unmanaged memory is RAM that the Dask scheduler is not directly aware of and which can cause workers to run out of memory and cause computations to hang and crash.

How does Dask delayed help in data processing?

The Dask delayed function decorates your functions so that they operate lazily. Rather than executing your function immediately, it will defer execution, placing the function and its arguments into a task graph.


1 Answers

OK, so what's wrong here is that each task contains the numpy array x, which is large. For each of the 100 tasks that we submit we need to serialize x, send it up to the scheduler, send it over to the worker, etc..

Instead, we'll send the array up to the cluster once:

[future] = c.scatter([x])

Now future is a token that points to an array x that lives on the cluster. Now we can submit tasks that refer to this remote future, instead of the numpy array on our local client.

# futures = [c.submit(f, x, param) for param in params]  # sends x each time
futures = [c.submit(f, future, param) for param in params]  # refers to remote x already on cluster

This is now much faster, and lets Dask control data movement more effectively.

Scatter data to all workers

If you expect to need to move the array x to all workers eventually then you may want to broadcast the array to start

[future] = c.scatter([x], broadcast=True)

Use Dask Delayed

Futures work fine with dask.delayed as well. There is no performance benefit here, but some people prefer this interface:

# futures = [c.submit(f, future, param) for param in params]

from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)
like image 149
MRocklin Avatar answered Sep 19 '22 13:09

MRocklin