I am trying to load a dataset with dask but when it is time to compute my dataset I keep getting problems like this:
WARNING - Worker exceeded 95% memory budget. Restarting.
I am just working on my local machine, initiating dask as follows:
if __name__ == '__main__':
libmarket.config.client = Client() # use dask.distributed by default
Now in my error messages I keep seeing a reference to a 'memory_limit=' keyword parameter. However I've searched the dask documentation thoroughly and I can't figure out how to increase the bloody worker memory-limit in a single-machine configuration. I have 256GB of RAM and I'm removing the majority of the future's columns (a 20GB csv file) before converting it back into a pandas dataframe, so I know it will fit in memory. I just need to increase the per-worker memory limit from my code (not using dask-worker) so that I can process it.
Please, somebody help me.
Dask. distributed stores the results of tasks in the distributed memory of the worker nodes. The central scheduler tracks all data on the cluster and determines when data should be freed. Completed results are usually cleared from memory as quickly as possible in order to make room for more computation.
If we start Dask using processes — as in the following code — we get 8 workers, one for each core, with each worker allotted 2 GB of memory (16 GB total / 8 workers, this will vary depending on your laptop).
Worker node in a Dask distributed cluster. Workers perform two functions: Serve data from a local dictionary. Perform computation on that data and on data from peers.
Many Dask users erroneously assume that Dask DataFrames are persisted in memory by default, which isn't true. Dask runs computations in memory. It doesn't store data in memory unless you explicitly call persist() . This post will teach you about when to persist DataFrames and the best practices.
The argument memory_limit
can be provided to the __init()__
functions of Client
and LocalCluster
.
Just calling Client()
is a shortcut for first calling LocalCluster()
and, then, Client
with the created cluster (Dask: Single Machine). When Client
is called without an instance of LocalCluster
, all possible arguments of LocalCluster.__init()__
can be provided to the initialization call of Client
. Therefore, the argument memory_limit
(and other arguments such as n_workers
) are not documented in the API documentation of the Client
class.
However, the argument memory_limit
does not seem to be properly documented in the API documentation of LocalCluster
(see Dask GitHub Issue #4118).
A working example would be the following. I added some more arguments, which might be useful for people finding this question/answer.
# load/import classes
from dask.distributed import Client, LocalCluster
# set up cluster and workers
cluster = LocalCluster(n_workers=4,
threads_per_worker=1,
memory_limit='64GB')
client = Client(cluster)
# have a look at your workers
client
# do some work
## ...
# close workers and cluster
client.close()
cluster.close()
The shortcut would be
# load/import classes
from dask.distributed import Client
# set up cluster and workers
client = Client(n_workers=4,
threads_per_worker=1,
memory_limit='64GB')
# have a look at your workers
client
# do some work
## ...
# close workers and cluster
client.close()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With