Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python3: Multiprocessing consumes extensively much RAM and slows down

I start multiple processes in order to create a list of new objects. htop shows me in between 1 and 4 processes (I always create 3 new objects).

def foo(self):
    with multiprocessing.Pool(processes=3, maxtasksperchild=10) as pool:
        result = pool.map_async(self.new_obj, self.information)
        self.new_objs = result.get()
        pool.terminate()
    gc.collect()

I call foo() multiple times, each time it is called, the whole process is running slower, the program does not even finish in the end, as it slows down to much. The program starts to eat up all my RAM, while the sequential approach does not have any significant RAM usage.

When I kill the program, most of the time this was the function the program was last executing.

->File "threading.py", line 293, in wait
    waiter.acquire()

Edit To give some information about my circumstances. I create a tree made of nodes. foo() is called by a parent node in order to create its child nodes. The result returned by the processes are these child nodes. Those are saved in a list at the parent node. I want to parallelize the creation of those child nodes instead of creating them in a sequential way.

like image 824
Natjo Avatar asked Jul 01 '16 08:07

Natjo


1 Answers

I think your issue has mainly to do with the fact that your parallelised function is a method of the object. It's hard to be certain without more information, but consider this little toy program:

import multiprocessing as mp
import numpy as np
import gc


class Object(object):
    def __init__(self, _):
        self.data = np.empty((100, 100, 100), dtype=np.float64)


class Container(object):
    def __new__(cls):
        self = object.__new__(cls)
        print("Born")
        return self

    def __init__(self):
        self.objects = []

    def foo(self):
        with mp.Pool(processes=3, maxtasksperchild=10) as pool:
            result = pool.map_async(self.new_obj, range(50))
            self.objects.extend(result.get())
            pool.terminate()
        gc.collect()

    def new_obj(self, i):
        return Object(i)

    def __del__(self):
        print("Dead")


if __name__ == '__main__':
    c = Container()
    for j in range(5):
        c.foo()

Now Container is called only once, so you'd expect to see a "Born", followed by a "Dead" being printed out; but since the code being executed by the processes is a method of the container, this means the whole container has to be executed elsewhere ! Running this, you will see a stream of intermingled "Born" and "Dead" as your container is being rebuilt on every execution of map:

Born
Born
Born
Born
Born
Dead
Born
Dead
Dead
Born
Dead
Born
... 
<MANY MORE LINES HERE>
...
Born
Dead

To convince yourself that the entire container is being copied and sent around every time, try to set some non-serialisable value:

def foo(self):
    with mp.Pool(processes=3, maxtasksperchild=10) as pool:
        result = pool.map_async(self.new_obj, range(50))
        self.fn = lambda x: x**2
        self.objects.extend(result.get())
        pool.terminate()
    gc.collect()

Which will immediately raise an AttributeError as it cannot serialise the container.

Let's sum up: when sending 1000 requests to the pool, Container will be serialised, sent to the processes and deserialised there a 1000 times. Sure, they will eventually be dropped (assuming there's not too much weird cross-referencing going on), but that will definitely put a lot of pressure on the RAM, as the object is serialised, called, updated, reserialised... for every element in your mapped inputs.

How can you solve that ? Well, ideally, do not share state:

def new_obj(_):
    return Object(_)


class Container(object):
    def __new__(cls):
        self = object.__new__(cls)
        print("Born")
        return self

    def __init__(self):
        self.objects = []

    def foo(self):
        with mp.Pool(processes=3, maxtasksperchild=10) as pool:
            result = pool.map_async(new_obj, range(50))
            self.objects.extend(result.get())
            pool.terminate()
        gc.collect()

    def __del__(self):
        print("Dead")

This completes in a fraction of the time, and only produces the tiniest blimp on the RAM (as a single Container is ever built). If you need some of the internal state to be passed there, extract it and send just that:

def new_obj(tup):
    very_important_state, parameters = tup
    return Object(very_important_state=very_important_state,
                  parameters=parameters)


class Container(object):
    def __new__(cls):
        self = object.__new__(cls)
        print("Born")
        return self

    def __init__(self):
        self.objects = []

    def foo(self):
        important_state = len(self.objects)
        with mp.Pool(processes=3, maxtasksperchild=10) as pool:
            result = pool.map_async(new_obj,
                                    ((important_state, i) for i in range(50)))
            self.objects.extend(result.get())
            pool.terminate()
        gc.collect()

    def __del__(self):
        print("Dead")

This has the same behaviour as before. If you absolutely cannot avoid sharing some mutable state between the processes, checkout out the multiprocessing tools for doing that without having to copy everything everywhere everytime.

like image 174
val Avatar answered Oct 05 '22 20:10

val