Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Managing worker memory on a dask localcluster

I am trying to load a dataset with dask but when it is time to compute my dataset I keep getting problems like this:

WARNING - Worker exceeded 95% memory budget. Restarting.

I am just working on my local machine, initiating dask as follows:

if __name__ == '__main__':
    libmarket.config.client = Client()  # use dask.distributed by default

Now in my error messages I keep seeing a reference to a 'memory_limit=' keyword parameter. However I've searched the dask documentation thoroughly and I can't figure out how to increase the bloody worker memory-limit in a single-machine configuration. I have 256GB of RAM and I'm removing the majority of the future's columns (a 20GB csv file) before converting it back into a pandas dataframe, so I know it will fit in memory. I just need to increase the per-worker memory limit from my code (not using dask-worker) so that I can process it.

Please, somebody help me.

like image 397
Jones Avatar asked Dec 26 '18 19:12

Jones


People also ask

How does dask manage memory?

Dask. distributed stores the results of tasks in the distributed memory of the worker nodes. The central scheduler tracks all data on the cluster and determines when data should be freed. Completed results are usually cleared from memory as quickly as possible in order to make room for more computation.

How many employees does dask have?

If we start Dask using processes — as in the following code — we get 8 workers, one for each core, with each worker allotted 2 GB of memory (16 GB total / 8 workers, this will vary depending on your laptop).

What is a worker in dask?

Worker node in a Dask distributed cluster. Workers perform two functions: Serve data from a local dictionary. Perform computation on that data and on data from peers.

Is dask a memory?

Many Dask users erroneously assume that Dask DataFrames are persisted in memory by default, which isn't true. Dask runs computations in memory. It doesn't store data in memory unless you explicitly call persist() . This post will teach you about when to persist DataFrames and the best practices.


1 Answers

The argument memory_limit can be provided to the __init()__ functions of Client and LocalCluster.

general remarks

Just calling Client() is a shortcut for first calling LocalCluster() and, then, Client with the created cluster (Dask: Single Machine). When Client is called without an instance of LocalCluster, all possible arguments of LocalCluster.__init()__ can be provided to the initialization call of Client. Therefore, the argument memory_limit (and other arguments such as n_workers) are not documented in the API documentation of the Client class.

However, the argument memory_limit does not seem to be properly documented in the API documentation of LocalCluster (see Dask GitHub Issue #4118).

solution

A working example would be the following. I added some more arguments, which might be useful for people finding this question/answer.

# load/import classes
from dask.distributed import Client, LocalCluster

# set up cluster and workers
cluster = LocalCluster(n_workers=4, 
                       threads_per_worker=1,
                       memory_limit='64GB')
client = Client(cluster)

# have a look at your workers
client

# do some work
## ... 

# close workers and cluster
client.close()
cluster.close()

The shortcut would be

# load/import classes
from dask.distributed import Client

# set up cluster and workers
client = Client(n_workers=4, 
                threads_per_worker=1,
                memory_limit='64GB')

# have a look at your workers
client

# do some work
## ... 

# close workers and cluster
client.close()

further reading

  • https://distributed.dask.org/en/latest/local-cluster.html
  • https://github.com/dask/dask/issues/4118
like image 154
daniel.heydebreck Avatar answered Oct 25 '22 15:10

daniel.heydebreck