Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly use dask's upload_file() to pass local code to workers

I have functions in a local_code.py file that I would like to pass to workers through dask. I've seen answers to questions on here saying that this can be done using the upload_file() function, but I can't seem to get it working because I'm still getting a ModuleNotFoundError.

The relevant part of the code is as follows.

from dask.distributed import Client
from dask_jobqueue import SLURMCluster

from local_code import *
helper_file = '/absolute/path/to/local_code.py'

def main():
    with SLURMCluster(**slurm_params) as cluster:

        cluster.scale(n_workers)

        with Client(cluster) as client:
            client.upload_file(helper_file)
            mapping = client.map(myfunc, data)
            client.gather(mapping)

if __name__ == '__main__':
    main()

Note, myfunc is imported from local_code, and there's no error importing it to map. The function myfunc also depends on other functions that are defined in local_code.

With this code, I'm still getting this error

distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95+\x00\x00\x00\x00\x00\x00\x00\x8c\x11local_code\x94\x8c\x$
Traceback (most recent call last):
  File "/home/gallagher.r/.local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 61, in loads
    return pickle.loads(x)
ModuleNotFoundError: No module named 'local_code'

Using upload_file() seems so straightforward that I'm not sure what I'm doing wrong. I must have it in the wrong place or not be understanding correctly what is passed to it.

I'd appreciate any help with this. Please let me know if you need any other information or if there's anything else I can supply from the error file.

like image 449
Ryan Gallagher Avatar asked Jul 19 '19 18:07

Ryan Gallagher


People also ask

What are workers in Dask?

Worker node in a Dask distributed cluster. Workers perform two functions: Serve data from a local dictionary. Perform computation on that data and on data from peers.

How Dask scheduler works?

The Dask SchedulerThe Scheduler acts as a middle layer between the client and the workers, instructing workers to execute the actual computations requested by the client. It also helps the workers coordinate with each other, deciding who should do which tasks.

What does Dask Client do?

The Client connects users to a Dask cluster. It provides an asynchronous user interface around functions and futures. This class resembles executors in concurrent. futures but also allows Future objects within submit/map calls.

How do I access DASK dashboard?

By default, when starting a scheduler on your local machine the dashboard will be served at http://localhost:8787/status . You can type this address into your browser to access the dashboard, but may be directed elsewhere if port 8787 is taken.


2 Answers

The upload_file method only uploads the file to the currently available workers. If a worker arrives after you call upload_file then that worker won't have the provided file.

If your situation the easiest thing to do is probably to wait until all of the workers arrive before you call upload file

cluster.scale(n)
with Client(cluster) as client:
    client.wait_for_workers(n)
    client.upload_file(...)
like image 62
MRocklin Avatar answered Sep 29 '22 20:09

MRocklin


Another option when you have workers going in/out is to use the Client.register_worker_callbacks to hook into whenever a new worker is registered/added. The one caveat is you will need to serialize your file(s) in the callback partial:

fname = ...
with open(fname, 'rb') as f:
  data = f.read()

client.register_worker_callbacks(
  setup=functools.partial(
    _worker_upload, data=data, fname=fname,
  )
)

def _worker_upload(dask_worker, *, data, fname):
  dask_worker.loop.add_callback(
    callback=dask_worker.upload_file,
    comm=None,  # not used
    filename=fname,
    data=data,
    load=True)

This will also upload the file the first time the callback is registered so you can avoid calling client.upload_file entirely.

like image 26
Richard Avatar answered Sep 29 '22 20:09

Richard