Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python decorator with multiprocessing fails

I would like to use a decorator on a function that I will subsequently pass to a multiprocessing pool. However, the code fails with "PicklingError: Can't pickle : attribute lookup __builtin__.function failed". I don't quite see why it fails here. I feel certain that it's something simple, but I can't find it. Below is a minimal "working" example. I thought that using the functools function would be enough to let this work.

If I comment out the function decoration, it works without an issue. What is it about multiprocessing that I'm misunderstanding here? Is there any way to make this work?

Edit: After adding both a callable class decorator and a function decorator, it turns out that the function decorator works as expected. The callable class decorator continues to fail. What is it about the callable class version that keeps it from being pickled?

import random
import multiprocessing
import functools

class my_decorator_class(object):
    def __init__(self, target):
        self.target = target
        try:
            functools.update_wrapper(self, target)
        except:
            pass

    def __call__(self, elements):
        f = []
        for element in elements:
            f.append(self.target([element])[0])
        return f

def my_decorator_function(target):
    @functools.wraps(target)
    def inner(elements):
        f = []
        for element in elements:
            f.append(target([element])[0])
        return f
    return inner

@my_decorator_function
def my_func(elements):
    f = []
    for element in elements:
        f.append(sum(element))
    return f

if __name__ == '__main__':
    elements = [[random.randint(0, 9) for _ in range(5)] for _ in range(10)]
    pool = multiprocessing.Pool(processes=4)
    results = [pool.apply_async(my_func, ([e],)) for e in elements]
    pool.close()
    f = [r.get()[0] for r in results]
    print(f)
like image 712
agarrett Avatar asked Feb 17 '12 22:02

agarrett


3 Answers

The problem is that pickle needs to have some way to reassemble everything that you pickle. See here for a list of what can be pickled:

http://docs.python.org/library/pickle.html#what-can-be-pickled-and-unpickled

When pickling my_func, the following components need to be pickled:

  • An instance of my_decorator_class, called my_func.

    This is fine. Pickle will store the name of the class and pickle its __dict__ contents. When unpickling, it uses the name to find the class, then creates an instance and fills in the __dict__ contents. However, the __dict__ contents present a problem...

  • The instance of the original my_func that's stored in my_func.target.

    This isn't so good. It's a function at the top-level, and normally these can be pickled. Pickle will store the name of the function. The problem, however, is that the name "my_func" is no longer bound to the undecorated function, it's bound to the decorated function. This means that pickle won't be able to look up the undecorated function to recreate the object. Sadly, pickle doesn't have any way to know that object it's trying to pickle can always be found under the name __main__.my_func.

You can change it like this and it will work:

import random
import multiprocessing
import functools

class my_decorator(object):
    def __init__(self, target):
        self.target = target
        try:
            functools.update_wrapper(self, target)
        except:
            pass

    def __call__(self, candidates, args):
        f = []
        for candidate in candidates:
            f.append(self.target([candidate], args)[0])
        return f

def old_my_func(candidates, args):
    f = []
    for c in candidates:
        f.append(sum(c))
    return f

my_func = my_decorator(old_my_func)

if __name__ == '__main__':
    candidates = [[random.randint(0, 9) for _ in range(5)] for _ in range(10)]
    pool = multiprocessing.Pool(processes=4)
    results = [pool.apply_async(my_func, ([c], {})) for c in candidates]
    pool.close()
    f = [r.get()[0] for r in results]
    print(f)

You have observed that the decorator function works when the class does not. I believe this is because functools.wraps modifies the decorated function so that it has the name and other properties of the function it wraps. As far as the pickle module can tell, it is indistinguishable from a normal top-level function, so it pickles it by storing its name. Upon unpickling, the name is bound to the decorated function so everything works out.

like image 77
Weeble Avatar answered Oct 17 '22 19:10

Weeble


I also had some problem using decorators in multiprocessing. I'm not sure if it's the same problem as yours:

My code looked like this:

from multiprocessing import Pool

def decorate_func(f):
    def _decorate_func(*args, **kwargs):
        print "I'm decorating"
        return f(*args, **kwargs)
    return _decorate_func

@decorate_func
def actual_func(x):
    return x ** 2

my_swimming_pool = Pool()
result = my_swimming_pool.apply_async(actual_func,(2,))
print result.get()

and when I run the code I get this:

Traceback (most recent call last):
  File "test.py", line 15, in <module>
    print result.get()
  File "somedirectory_too_lengthy_to_put_here/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I fixed it by defining a new function to wrap the function in the decorator function, instead of using the decorator syntax

from multiprocessing import Pool

def decorate_func(f):
    def _decorate_func(*args, **kwargs):
        print "I'm decorating"
        return f(*args, **kwargs)
    return _decorate_func

def actual_func(x):
    return x ** 2

def wrapped_func(*args, **kwargs):
    return decorate_func(actual_func)(*args, **kwargs)

my_swimming_pool = Pool()
result = my_swimming_pool.apply_async(wrapped_func,(2,))
print result.get()

The code ran perfectly and I got:

I'm decorating
4

I'm not very experienced at Python, but this solution solved my problem for me

like image 33
blueishpoop Avatar answered Oct 17 '22 18:10

blueishpoop


If you want the decorators too bad (like me), you can also use the exec() command on the function string, to circumvent the mentioned pickling.

I wanted to be able to pass all the arguments to an original function and then use them successively. The following is my code for it.

At first, I made a make_functext() function to convert the target function object to a string. For that, I used the getsource() function from the inspect module (see doctumentation here and note that it can't retrieve source code from compiled code etc.). Here it is:

from inspect import getsource

def make_functext(func):
    ft = '\n'.join(getsource(func).split('\n')[1:]) # Removing the decorator, of course
    ft = ft.replace(func.__name__, 'func')          # Making function callable with 'func'
    ft = ft.replace('#§ ', '').replace('#§', '')    # For using commented code starting with '#§'
    ft = ft.strip()                                 # In case the function code was indented
    return ft

It is used in the following _worker() function that will be the target of the processes:

def _worker(functext, args):
    scope = {}               # This is needed to keep executed definitions
    exec(functext, scope)
    scope['func'](args)      # Using func from scope

And finally, here's my decorator:

from multiprocessing import Process 

def parallel(num_processes, **kwargs):
    def parallel_decorator(func, num_processes=num_processes):
        functext = make_functext(func)
        print('This is the parallelized function:\n', functext)
        def function_wrapper(funcargs, num_processes=num_processes):
            workers = []
            print('Launching processes...')
            for k in range(num_processes):
                p = Process(target=_worker, args=(functext, funcargs[k])) # use args here
                p.start()
                workers.append(p)
        return function_wrapper
    return parallel_decorator

The code can finally be used by defining a function like this:

@parallel(4)
def hello(args):
    #§ from time import sleep     # use '#§' to avoid unnecessary (re)imports in main program
    name, seconds = tuple(args)   # unpack args-list here
    sleep(seconds)
    print('Hi', name)

... which can now be called like this:

hello([['Marty', 0.5],
       ['Catherine', 0.9],
       ['Tyler', 0.7],
       ['Pavel', 0.3]])

... which outputs:

This is the parallelized function:
 def func(args):
        from time import sleep
        name, seconds = tuple(args)
        sleep(seconds)
        print('Hi', name)
Launching processes...
Hi Pavel
Hi Marty
Hi Tyler
Hi Catherine

Thanks for reading, this is my very first post. If you find any mistakes or bad practices, feel free to leave a comment. I know that these string conversions are quite dirty, though...

like image 38
N. Jonas Figge Avatar answered Oct 17 '22 19:10

N. Jonas Figge