I have functions in a local_code.py
file that I would like to pass to workers through dask. I've seen answers to questions on here saying that this can be done using the upload_file()
function, but I can't seem to get it working because I'm still getting a ModuleNotFoundError
.
The relevant part of the code is as follows.
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
from local_code import *
helper_file = '/absolute/path/to/local_code.py'
def main():
with SLURMCluster(**slurm_params) as cluster:
cluster.scale(n_workers)
with Client(cluster) as client:
client.upload_file(helper_file)
mapping = client.map(myfunc, data)
client.gather(mapping)
if __name__ == '__main__':
main()
Note, myfunc
is imported from local_code
, and there's no error importing it to map. The function myfunc
also depends on other functions that are defined in local_code
.
With this code, I'm still getting this error
distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95+\x00\x00\x00\x00\x00\x00\x00\x8c\x11local_code\x94\x8c\x$
Traceback (most recent call last):
File "/home/gallagher.r/.local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 61, in loads
return pickle.loads(x)
ModuleNotFoundError: No module named 'local_code'
Using upload_file()
seems so straightforward that I'm not sure what I'm doing wrong. I must have it in the wrong place or not be understanding correctly what is passed to it.
I'd appreciate any help with this. Please let me know if you need any other information or if there's anything else I can supply from the error file.
Worker node in a Dask distributed cluster. Workers perform two functions: Serve data from a local dictionary. Perform computation on that data and on data from peers.
The Dask SchedulerThe Scheduler acts as a middle layer between the client and the workers, instructing workers to execute the actual computations requested by the client. It also helps the workers coordinate with each other, deciding who should do which tasks.
The Client connects users to a Dask cluster. It provides an asynchronous user interface around functions and futures. This class resembles executors in concurrent. futures but also allows Future objects within submit/map calls.
By default, when starting a scheduler on your local machine the dashboard will be served at http://localhost:8787/status . You can type this address into your browser to access the dashboard, but may be directed elsewhere if port 8787 is taken.
The upload_file
method only uploads the file to the currently available workers. If a worker arrives after you call upload_file
then that worker won't have the provided file.
If your situation the easiest thing to do is probably to wait until all of the workers arrive before you call upload file
cluster.scale(n)
with Client(cluster) as client:
client.wait_for_workers(n)
client.upload_file(...)
Another option when you have workers going in/out is to use the Client.register_worker_callbacks
to hook into whenever a new worker is registered/added. The one caveat is you will need to serialize your file(s) in the callback partial:
fname = ...
with open(fname, 'rb') as f:
data = f.read()
client.register_worker_callbacks(
setup=functools.partial(
_worker_upload, data=data, fname=fname,
)
)
def _worker_upload(dask_worker, *, data, fname):
dask_worker.loop.add_callback(
callback=dask_worker.upload_file,
comm=None, # not used
filename=fname,
data=data,
load=True)
This will also upload the file the first time the callback is registered so you can avoid calling client.upload_file
entirely.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With