Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there an advantage to pre-scattering data objects in Dask?

If I pre-scatter a data object across worker nodes, does it get copied in its entirety to each of the worker nodes? Is there an advantage in doing so if that data object is big?

Using the futures interface as an example:

client.scatter(data, broadcast=True)
results = dict()
for i in tqdm_notebook(range(replicates)):
    results[i] = client.submit(nn_train_func, data, **params)

Using the delayed interface as an example:

client.scatter(data, broadcast=True)
results = dict()
for i in tqdm_notebook(range(replicates)):
    results[i] = delayed(nn_train_func, data, **params)

The reason I ask is because I noticed the following phenomena:

  1. If I pre-scatter the data, delayed appears to re-send data to the worker nodes, thus approximately doubling memory usage. It appears that pre-scattering is not doing what I expected it to do, which is allow for the worker nodes to reference the pre-scattered data.
  2. The futures interface takes a long time to iterate through the loop (significantly longer). I am currently not sure how to identify where the bottleneck here is.
  3. Using the delayed interface, from the time the compute() function is called to the time that activity is reflected on the dashboard, there is an extensive delay, which I suspected was due to data copying.
like image 982
ericmjl Avatar asked Oct 25 '18 20:10

ericmjl


1 Answers

Pre-scattering is designed to avoid placing large object data into the task graph.

x = np.array(lots_of_data)
a = client.submit(add, x, 1)  # have to send all of x to the scheduler
b = client.submit(add, x, 2)  # again
c = client.submit(add, x, 3)  # and again

You'll feel this pain because client.submit will be slow to return, and Dask may even throw a warning.

So instead we scatter our data, receiving a future in return

x = np.array(lots_of_data)
x_future = client.scatter(x)
a = client.submit(add, x_future, 1)  # Only have to send the future/pointer
b = client.submit(add, x_future, 2)  # so this is fast
c = client.submit(add, x_future, 3)  # and this

In your situation you're almost doing this, the only difference is that you scatter your data, then forget about the future it returns, and send your data again.

client.scatter(data, broadcast=True)  # whoops!  forgot to capture the output
data = client.scatter(data, broadcast=True)  # data is now a future pointing to its remote value

You can choose to broadcast or not. If you know that all of your workers will need this data then it's not a bad thing to do, but things will be fine regardless.

like image 141
MRocklin Avatar answered Oct 23 '22 20:10

MRocklin