What is the difference between the following LocalCluster
configurations for dask.distributed
?
Client(n_workers=4, processes=False, threads_per_worker=1)
versus
Client(n_workers=1, processes=True, threads_per_worker=4)
They both have four threads working on the task graph, but the first has four workers. What, then, would be the benefit of having multiple workers acting as threads as opposed to a single worker with multiple threads?
Edit: just a clarification, I'm aware of the difference between processes, threads and shared memory, so this question is oriented more towards the configurational differences of these two Clients.
As explained in the Machine Specifications section, the machine has 4 cores and therefore a maximum of 8 threads/ processes can be run in parallel.
Dask. distributed is a centrally managed, distributed, dynamic task scheduler. The central dask scheduler process coordinates the actions of several dask worker processes spread across multiple machines and the concurrent requests of several clients.
TL;DR: unmanaged memory is RAM that the Dask scheduler is not directly aware of and which can cause workers to run out of memory and cause computations to hang and crash.
DaskExecutor allows you to run Airflow tasks in a Dask Distributed cluster. Dask clusters can be run on a single machine or on remote networks. For complete details, consult the Distributed documentation. Edit your airflow.
I was inspired by both Victor and Martin's answers to dig a little deeper, so here's an in-depth summary of my understanding. (couldn't do it in a comment)
First, note that the scheduler printout in this version of dask isn't quite intuitive. processes
is actually the number of workers, cores
is actually the total number of threads in all workers.
Secondly, Victor's comments about the TCP address and adding/connecting more workers are good to point out. I'm not sure if more workers could be added to a cluster with processes=False
, but I think the answer is probably yes.
Now, consider the following script:
from dask.distributed import Client
if __name__ == '__main__':
with Client(processes=False) as client: # Config 1
print(client)
with Client(processes=False, n_workers=4) as client: # Config 2
print(client)
with Client(processes=False, n_workers=3) as client: # Config 3
print(client)
with Client(processes=True) as client: # Config 4
print(client)
with Client(processes=True, n_workers=3) as client: # Config 5
print(client)
with Client(processes=True, n_workers=3,
threads_per_worker=1) as client: # Config 6
print(client)
This produces the following output in dask
version 2.3.0 for my laptop (4 cores):
<Client: scheduler='inproc://90.147.106.86/14980/1' processes=1 cores=4>
<Client: scheduler='inproc://90.147.106.86/14980/9' processes=4 cores=4>
<Client: scheduler='inproc://90.147.106.86/14980/26' processes=3 cores=6>
<Client: scheduler='tcp://127.0.0.1:51744' processes=4 cores=4>
<Client: scheduler='tcp://127.0.0.1:51788' processes=3 cores=6>
<Client: scheduler='tcp://127.0.0.1:51818' processes=3 cores=3>
Here's my understanding of the differences between the configurations:
dask
calls its function nprocesses_nthreads()
to set the defaults (with processes=False
, 1 process and threads equal to available cores).n_workers
was given, the threads/workers is chosen by dask such that the total number of threads is equal to the number of cores (i.e., 1). Again, processes
in the print output is not exactly correct -- it's actually the number of workers (which in this case are actually threads). n_workers
doesn't divide equally into the number of cores, dask chooses 2 threads/worker to overcommit instead of undercommit. If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With