I'm using dask/distributed to submit 100+ evaluations of a function to the multi-node cluster. Each eval is very costly, about 90 sec of CPU time. I've noticed though that there seems to be a memory leak and all workers over time grow in size, although the function i'm evaluating is not pure. Here's sample code to reproduce this behavior:
import numpy as np
from dask.distributed import Client
class Foo:
def __init__(self):
self.a = np.random.rand(2000, 2000) # dummy data, not really used
@staticmethod
def myfun1(k):
return np.random.rand(10000 + k, 100)
def myfun2(self, k):
return np.random.rand(10000 + k, 100)
client = Client('XXX-YYY:8786')
f = Foo()
tasks = client.map(f.myfun2, range(100), pure=False)
results = client.gather(tasks)
tasks = []
If client.map() is called to execute f.myfun1() (which is just a static method), the workers don't grow in size. However, if one calls f.myfun2() workers size grows considerably (eg. 50mb -> 400mb) after just one client.map() call above. Also client.close() does nothing to reduce workers size.
Is this a memory leak or I'm not using dask.distributed correctly? I definitely don't care about results of my calculations being available afterwards or shared on the cluster. FWIW, tested with distributed v1.19.1 and Python 3.5.4
Nice example.
Your myfun2
method is attached to your f = Foo()
object, which carries around with it a decently large attribute (f.a)
. This f.myfun2
method is thus actually really expensive to move around, and you're creating 1000 of them. If you can it's best to avoid using methods of large objects in a distributed setting. Instead consider using functions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With