Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to efficiently send a large numpy array to the cluster with Dask.array

Tags:

numpy

dask

I have a large NumPy array on my local machine that I want to parallelize with Dask.array on a cluster

import numpy as np
x = np.random.random((1000, 1000, 1000))

However when I use dask.array I find that my scheduler starts taking up a lot of RAM. Why is this? Shouldn't this data go to the workers?

import dask.array as da
x = da.from_array(x, chunks=(100, 100, 100))

from dask.distributed import Client
client = Client(...)
x = x.persist()
like image 293
MRocklin Avatar asked Jan 29 '23 17:01

MRocklin


1 Answers

Whenever you persist or compute a Dask collection that data goes to the scheduler, and from there to the workers. If you want to bypass storing data on the scheduler then you will have to learn how to move data with scatter.

You have three options:

  1. Don't load data on your client machine
  2. Scatter-then-chunk
  3. Chunk-then-scatter

Don't load data on your client machine

The best approach is to include loading data as part of your computation rather than do it locally.

Before
x = load_array_from_file(fn)  # load into a local numpy array
x = da.from_array(x, chunks=(100, 100, 100))  # split with dask
x = x.persist()
After
x = dask.delayed(load_array_from_file)(fn)
x = da.from_delayed(x, shape=(1000, 1000, 1000), dtype=float)
x = x.rechunk((100, 100, 100))
x = x.persist()

More information about creating dask arrays is here: http://dask.pydata.org/en/latest/array-creation.html

Scatter then chunk

You can scatter your numpy array directly to a worker

future = client.scatter(x)
x = da.from_delayed(future, shape=x.shape, dtype=x.dtype)
x = x.rechunk((100, 100, 100))
x = x.persist()

This moves your data directly to a worker, then chunks from there. This is nice because it bypasses the scheduler. However you are now at risk for data loss if your workers start to fail. This only matters if you are in a massively parallel system.

This is also somewhat inefficient because all of your data in on one worker, rather than spread out. You might call client.rebalance or read on.

Chunk then scatter

You can chunk your data locally with a local scheduler, then scatter out to the cluster.

x = da.from_array(x, chunks=(100, 100, 100))
x = x.persist(get=dask.threaded.get)  # chunk locally
futures = client.scatter(dict(x.dask))  # scatter chunks
x.dask = x  # re-attach scattered futures as task graph

Stay local

Alternatively you can continue to use dask locally, either with the threaded scheduler, or with the distributed scheduler using only the local process.

client = Client(processes=False)

This will stop needless copying of data between your local process, the scheduler, and the workers. They are now all in your same local process.

See also: How to efficiently submit tasks with large arguments in Dask distributed? for a task-based version of this answer

like image 136
MRocklin Avatar answered Feb 02 '23 08:02

MRocklin