Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Difference between dask.distributed LocalCluster with threads vs. processes

What is the difference between the following LocalCluster configurations for dask.distributed?

Client(n_workers=4, processes=False, threads_per_worker=1)

versus

Client(n_workers=1, processes=True, threads_per_worker=4)

They both have four threads working on the task graph, but the first has four workers. What, then, would be the benefit of having multiple workers acting as threads as opposed to a single worker with multiple threads?

Edit: just a clarification, I'm aware of the difference between processes, threads and shared memory, so this question is oriented more towards the configurational differences of these two Clients.

like image 774
jrinker Avatar asked Sep 02 '19 16:09

jrinker


People also ask

How many threads does DASK use?

As explained in the Machine Specifications section, the machine has 4 cores and therefore a maximum of 8 threads/ processes can be run in parallel.

What is DASK distributed?

Dask. distributed is a centrally managed, distributed, dynamic task scheduler. The central dask scheduler process coordinates the actions of several dask worker processes spread across multiple machines and the concurrent requests of several clients.

What is unmanaged memory DASK?

TL;DR: unmanaged memory is RAM that the Dask scheduler is not directly aware of and which can cause workers to run out of memory and cause computations to hang and crash.

Does airflow use DASK?

DaskExecutor allows you to run Airflow tasks in a Dask Distributed cluster. Dask clusters can be run on a single machine or on remote networks. For complete details, consult the Distributed documentation. Edit your airflow.


1 Answers

I was inspired by both Victor and Martin's answers to dig a little deeper, so here's an in-depth summary of my understanding. (couldn't do it in a comment)

First, note that the scheduler printout in this version of dask isn't quite intuitive. processes is actually the number of workers, cores is actually the total number of threads in all workers.

Secondly, Victor's comments about the TCP address and adding/connecting more workers are good to point out. I'm not sure if more workers could be added to a cluster with processes=False, but I think the answer is probably yes.

Now, consider the following script:

from dask.distributed import Client

if __name__ == '__main__':
    with Client(processes=False) as client:  # Config 1
        print(client)
    with Client(processes=False, n_workers=4) as client:  # Config 2
        print(client)
    with Client(processes=False, n_workers=3) as client:  # Config 3
        print(client)
    with Client(processes=True) as client:  # Config 4
        print(client)
    with Client(processes=True, n_workers=3) as client:  # Config 5
        print(client)
    with Client(processes=True, n_workers=3,
                threads_per_worker=1) as client:  # Config 6
        print(client)

This produces the following output in dask version 2.3.0 for my laptop (4 cores):

<Client: scheduler='inproc://90.147.106.86/14980/1' processes=1 cores=4>
<Client: scheduler='inproc://90.147.106.86/14980/9' processes=4 cores=4>
<Client: scheduler='inproc://90.147.106.86/14980/26' processes=3 cores=6>
<Client: scheduler='tcp://127.0.0.1:51744' processes=4 cores=4>
<Client: scheduler='tcp://127.0.0.1:51788' processes=3 cores=6>
<Client: scheduler='tcp://127.0.0.1:51818' processes=3 cores=3>

Here's my understanding of the differences between the configurations:

  1. The scheduler and all workers are run as threads within the Client process. (As Martin said, this is useful for introspection.) Because neither the number of workers or the number of threads/worker is given, dask calls its function nprocesses_nthreads() to set the defaults (with processes=False, 1 process and threads equal to available cores).
  2. Same as 1, but since n_workers was given, the threads/workers is chosen by dask such that the total number of threads is equal to the number of cores (i.e., 1). Again, processes in the print output is not exactly correct -- it's actually the number of workers (which in this case are actually threads).
  3. Same as 2, but since n_workers doesn't divide equally into the number of cores, dask chooses 2 threads/worker to overcommit instead of undercommit.
  4. The Client, Scheduler and all workers are separate processes. Dask chooses the default number of workers (equal to cores because it's <= 4) and the default number of threads/worker (1).
  5. Same processes/thread configuration as 5, but the total threads are overprescribed for the same reason as 3.
  6. This behaves as expected.
like image 57
jrinker Avatar answered Nov 05 '22 14:11

jrinker