Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to store worker-local variables in dask/distributed

Using dask 0.15.0, distributed 1.17.1.

I want to memoize some things per worker, like a client to access google cloud storage, because instantiating it is expensive. I'd rather store this in some kind of worker attribute. What is the canonical way to do this? Or are globals the way to go?

like image 885
Vincent Schut Avatar asked Jul 10 '17 09:07

Vincent Schut


People also ask

Is Dask distributed?

Dask. distributed is a centrally managed, distributed, dynamic task scheduler. The central dask-scheduler process coordinates the actions of several dask-worker processes spread across multiple machines and the concurrent requests of several clients.

What is unmanaged memory Dask?

unmanaged. Memory usage that Dask is not directly aware of. It is estimated by subtracting managed memory from the total process memory and typically includes: The Python interpreter code, loaded modules, and global variables. Memory temporarily used by running tasks.

What are workers in Dask?

Worker node in a Dask distributed cluster. Workers perform two functions: Serve data from a local dictionary. Perform computation on that data and on data from peers.

How many employees does Dask have?

If we start Dask using processes — as in the following code — we get 8 workers, one for each core, with each worker allotted 2 GB of memory (16 GB total / 8 workers, this will vary depending on your laptop).


2 Answers

On the worker

You can get access to the local worker with the get_worker function. A slightly cleaner thing than globals would be to attach state to the worker:

from dask.distributed import get_worker

def my_function(...):
    worker = get_worker()
    worker.my_personal_state = ...

future = client.submit(my_function, ...)

We should probably add a generic namespace variable on workers to serve as a general place for information like this, but haven't yet.

As Globals

That being said though, for things like connections to external services globals aren't entirely evil. Many systems like Tornado use global singletons.

If you care about thread safety

Note that workers are often multi-threaded. If your connection object isn't threadsafe then you may need to cache a different object per-thread. For this I recommend using a threading.local object. Dask uses one at

from distributed.worker import thread_state
like image 189
MRocklin Avatar answered Oct 18 '22 04:10

MRocklin


Dask Actors

For simpler use cases, other solutions may be preferable; however, its worth considering Actors. Actors are currently an experimental feature in Dask which enables stateful computations.

Dask Actors

like image 27
mcguip Avatar answered Oct 18 '22 05:10

mcguip