Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I run a dask.distributed cluster in a single thread?

Tags:

python

dask

How can I run a complete Dask.distributed cluster in a single thread? I want to use this for debugging or profiling.

Note: this is a frequently asked question. I'm adding the question and answer here to Stack Overflow just for future reuse.

like image 513
MRocklin Avatar asked May 26 '17 05:05

MRocklin


People also ask

Is dask distributed?

Dask. distributed is a centrally managed, distributed, dynamic task scheduler. The central dask-scheduler process coordinates the actions of several dask-worker processes spread across multiple machines and the concurrent requests of several clients.

Does airflow use dask?

DaskExecutor allows you to run Airflow tasks in a Dask Distributed cluster. Dask clusters can be run on a single machine or on remote networks. For complete details, consult the Distributed documentation.

How many employees does dask have?

If we start Dask using processes — as in the following code — we get 8 workers, one for each core, with each worker allotted 2 GB of memory (16 GB total / 8 workers, this will vary depending on your laptop).


1 Answers

Local Scheduler

If you can get by with the single-machine scheduler's API (just compute) then you can use the single-threaded scheduler

x.compute(scheduler='single-threaded')

Distributed Scheduler - Single Machine

If you want to run a dask.distributed cluster on a single machine you can start the client with no arguments

from dask.distributed import Client
client = Client()  # Starts local cluster
x.compute()

This uses many threads but operates on one machine

Distributed Scheduler - Single Process

Alternatively if you want to run everything in a single process then you can use the processes=False keyword

from dask.distributed import Client
client = Client(processes=False)  # Starts local cluster
x.compute()

All of the communication and control happen in a single thread, though computation occurs in a separate thread pool.

Distributed Scheduler - Single Thread

To run control, communication, and computation all in a single thread you need to create a Tornado concurrent.futures Executor. Beware, this Tornado API may not be public.

from dask.distributed import Scheduler, Worker, Client
from tornado.concurrent import DummyExecutor
from tornado.ioloop import IOLoop
import threading

loop = IOLoop()
e = DummyExecutor()
s = Scheduler(loop=loop)
s.start()
w = Worker(s.address, loop=loop, executor=e)
loop.add_callback(w._start)

async def f():
    async with Client(s.address, start=False) as c:
        future = c.submit(threading.get_ident)
        result = await future
        return result

>>> threading.get_ident() == loop.run_sync(f)
True
like image 199
MRocklin Avatar answered Oct 17 '22 16:10

MRocklin