I have a large NumPy array on my local machine that I want to parallelize with Dask.array on a cluster
import numpy as np
x = np.random.random((1000, 1000, 1000))
However when I use dask.array I find that my scheduler starts taking up a lot of RAM. Why is this? Shouldn't this data go to the workers?
import dask.array as da
x = da.from_array(x, chunks=(100, 100, 100))
from dask.distributed import Client
client = Client(...)
x = x.persist()
Whenever you persist
or compute
a Dask collection that data goes to the scheduler, and from there to the workers. If you want to bypass storing data on the scheduler then you will have to learn how to move data with scatter.
You have three options:
The best approach is to include loading data as part of your computation rather than do it locally.
Beforex = load_array_from_file(fn) # load into a local numpy array
x = da.from_array(x, chunks=(100, 100, 100)) # split with dask
x = x.persist()
After
x = dask.delayed(load_array_from_file)(fn)
x = da.from_delayed(x, shape=(1000, 1000, 1000), dtype=float)
x = x.rechunk((100, 100, 100))
x = x.persist()
More information about creating dask arrays is here: http://dask.pydata.org/en/latest/array-creation.html
You can scatter your numpy array directly to a worker
future = client.scatter(x)
x = da.from_delayed(future, shape=x.shape, dtype=x.dtype)
x = x.rechunk((100, 100, 100))
x = x.persist()
This moves your data directly to a worker, then chunks from there. This is nice because it bypasses the scheduler. However you are now at risk for data loss if your workers start to fail. This only matters if you are in a massively parallel system.
This is also somewhat inefficient because all of your data in on one worker, rather than spread out. You might call client.rebalance
or read on.
You can chunk your data locally with a local scheduler, then scatter out to the cluster.
x = da.from_array(x, chunks=(100, 100, 100))
x = x.persist(get=dask.threaded.get) # chunk locally
futures = client.scatter(dict(x.dask)) # scatter chunks
x.dask = x # re-attach scattered futures as task graph
Alternatively you can continue to use dask locally, either with the threaded scheduler, or with the distributed scheduler using only the local process.
client = Client(processes=False)
This will stop needless copying of data between your local process, the scheduler, and the workers. They are now all in your same local process.
See also: How to efficiently submit tasks with large arguments in Dask distributed? for a task-based version of this answer
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With