Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the "right" way to close a Dask LocalCluster?

I am trying to use dask-distributed on my laptop using a LocalCluster, but I have still not found a way to let my application close without raising some warnings or triggering some strange iterations with matplotlib (I am using the tkAgg backend).

For example, if I close both the client and the cluster in this order then tk can not remove in an appropriate way the image from the memory and I get the following error:

Traceback (most recent call last):
  File "/opt/Python-3.6.0/lib/python3.6/tkinter/__init__.py", line 3501, in __del__
    self.tk.call('image', 'delete', self.name)
RuntimeError: main thread is not in main loop

For example, the following code generates this error:

from time import sleep
import numpy as np
import matplotlib.pyplot as plt
from dask.distributed import Client, LocalCluster

if __name__ == '__main__':
    cluster = LocalCluster(
        n_workers=2,
        processes=True,
        threads_per_worker=1
    )
    client = Client(cluster)

    x = np.linspace(0, 1, 100)
    y = x * x
    plt.plot(x, y)

    print('Computation complete! Stopping workers...')
    client.close()
    sleep(1)
    cluster.close()

    print('Execution complete!')

The sleep(1) line makes the problem more likely to appear, as it does not occur at every execution.

Any other combination that I tried to stop the execution (avoid to close the client, avoid to close the cluster, avoid to close both) generates problems with tornado, instead. Usually the following

tornado.application - ERROR - Exception in Future <Future cancelled> after timeout

What is the right combination to stop the local cluster and the client? Am I missing something?

These are the libraries that I am using:

  • python 3.[6,7].0
  • tornado 5.1.1
  • dask 0.20.0
  • distributed 1.24.0
  • matplotlib 3.0.1

Thank you for your help!

like image 790
SteP Avatar asked Nov 20 '18 14:11

SteP


People also ask

How do I Close a cluster in Python?

Once the with block exits, both cluster.close () and client.close () are called. The first one closes the cluster, scehduler, nanny and all workers, and the second disconnects the client (created on your python interpreter) from the cluster. While the workets are processing, you can check if the cluster is active by checking lsof -i :9895.

What is the use of localcluster()?

It sets up a temporary LocalCluster and then shuts it down. Very useful when different parts of your code must be parallelized in different ways (e.g. one needs threads and the other needs processes).

How to check if a cluster is active?

The first one closes the cluster, scehduler, nanny and all workers, and the second disconnects the client (created on your python interpreter) from the cluster. While the workets are processing, you can check if the cluster is active by checking lsof -i :9895.

How can leaders avoid downsizing during a closure or closure?

Also be sensitive to any language barriers. Many members of the workforce at Accellent’s Memphis plant were Vietnamese and didn’t speak English. We brought in translators to ensure that everyone understood our communications. Be visible and personal. A closure or a downsizing is not an excuse for leaders to go into hiding.


2 Answers

Expanding on skibee's answer, here is a pattern I use. It sets up a temporary LocalCluster and then shuts it down. Very useful when different parts of your code must be parallelized in different ways (e.g. one needs threads and the other needs processes).

from dask.distributed import Client, LocalCluster
import multiprocessing as mp

with LocalCluster(n_workers=int(0.9 * mp.cpu_count()),
    processes=True,
    threads_per_worker=1,
    memory_limit='2GB',
    ip='tcp://localhost:9895',
) as cluster, Client(cluster) as client:
    # Do something using 'client'

What's happening above:

  • A cluster is being spun up on your local machine (i.e. the one running the Python interpreter). The scheduler of this cluster is listening on port 9895.

  • The cluster is created and a number of workers are spun up. Each worker is a process, since I specified processes=True.

  • The number of workers spun up is 90% of the number of CPU cores, rounded down. So an 8-core machine will spawn 7 worker processes. This leaves at least one core free for SSH / Notebook server / other applications.

  • Each worker is initialized with 2GB of RAM. Having a temporary cluster allows you to spin up workers with different amount of RAM for different workloads.

  • Once the with block exits, both cluster.close() and client.close() are called. The first one closes the cluster, scehduler, nanny and all workers, and the second disconnects the client (created on your python interpreter) from the cluster.

While the workets are processing, you can check if the cluster is active by checking lsof -i :9895. If there is no output, the cluster has closed.


Sample use-case: suppose you want to use a pre-trained ML model to predict on 1,000,000 examples.

The model is optimized/vectorized such that it can predict on 10K examples pretty fast, but 1M is slow. In such a case, a setup which works is to load the multiple copies of the model from disk, and then use them to predict on chunks of the 1M examples.

Dask allows you to do this pretty easily and achieve a good speedup:

def load_and_predict(input_data_chunk):
    model_path = '...' # On your disk, so accessible by all processes.
    model = some_library.load_model(model_path)
    labels, scores = model.predict(input_data_chunk, ...)
    return np.array([labels, scores])

# (not shown) Load `input_data`, a list of your 1M examples.

import dask.array as DaskArray

da_input_data = DaskArray.from_array(input_data, chunks=(10_000,))

prediction_results = None
with LocalCluster(n_workers=int(0.9 * mp.cpu_count()),
    processes=True,
    threads_per_worker=1,
    memory_limit='2GB',
    ip='tcp://localhost:9895',
) as cluster, Client(cluster) as client:
    prediction_results = da_input_data.map_blocks(load_and_predict).compute()

# Combine prediction_results, which will be a list of Numpy arrays, 
# each with labels, scores for 10,000 examples.

References:

  • Setting up a local cluster: https://distributed.dask.org/en/latest/local-cluster.html
  • Client close method: https://distributed.dask.org/en/latest/api.html#distributed.Client.close
  • Scheduler close method, which from my understanding is what is invoked by cluster.close(): https://distributed.dask.org/en/latest/scheduling-state.html#distributed.scheduler.Scheduler.close

  • with statement having multiple variables: https://stackoverflow.com/a/1073814/4900327

like image 86
Abhishek Divekar Avatar answered Oct 16 '22 16:10

Abhishek Divekar


From our experience - the best way is to use a context manager, for example:

import numpy as np
import matplotlib.pyplot as plt
from dask.distributed import Client, LocalCluster 

if __name__ == '__main__':
    cluster = LocalCluster(
    n_workers=2,
    processes=True,
    threads_per_worker=1
    )
    with Client(cluster) as client:
        x = np.linspace(0, 1, 100)
        y = x * x
        plt.plot(x, y)
        print('Computation complete! Stopping workers...')

    print('Execution complete!')
like image 38
skibee Avatar answered Oct 16 '22 15:10

skibee