Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python NotImplementedError: pool objects cannot be passed between processes

Tags:

I'm trying to deliver work when a page is appended to the pages list, but my code output returns a NotImplementedError. Here is the code with what I'm trying to do:

Code:

from multiprocessing import Pool, current_process
import time
import random
import copy_reg
import types
import threading


class PageControler(object):
    def __init__(self):
        self.nProcess = 3
        self.pages = [1,2,3,4,5,6,7,8,9,10]
        self.manageWork()


    def manageWork(self):

        self.pool = Pool(processes=self.nProcess)

        time.sleep(2)
        work_queue = threading.Thread(target=self.modifyQueue)
        work_queue.start()

        #pool.close()
        #pool.join()

    def deliverWork(self):    
        if self.pages != []:
            pag = self.pages.pop()
            self.pool.apply_async(self.myFun)


    def modifyQueue(self):
        t = time.time()
        while (time.time()-t) < 10:
            time.sleep(1)
            self.pages.append(99)
            print self.pages
            self.deliverWork()

    def myFun(self):
        time.sleep(2)


if __name__ == '__main__':
    def _pickle_method(m):
        if m.im_self is None:
            return getattr, (m.im_class, m.im_func.func_name)
        else:
            return getattr, (m.im_self, m.im_func.func_name)

    copy_reg.pickle(types.MethodType, _pickle_method)

    PageControler()

Output:

NotImplementedError: pool objects cannot be passed between processes or pickled

It's any way to pass the pool object between the processes ?

Edit:

I'm using Python 2.6

like image 812
flaviussn Avatar asked Aug 19 '14 11:08

flaviussn


2 Answers

In order to pickle the instance method you're trying to pass to the Pool, Python needs to pickle the entire PageControler object, including its instance variables. One of those instance variables is the Pool object itself, and Pool objects can't be pickled, hence the error. You can work around this by implementing __getstate__ on the object, and using that to remove the pool object from the instance prior to pickling:

class PageControler(object):
    def __init__(self):
        self.nProcess = 3
        self.pages = [1,2,3,4,5,6,7,8,9,10]
        self.manageWork()


    def manageWork(self):

        self.pool = Pool(processes=self.nProcess)

        time.sleep(2)
        work_queue = threading.Thread(target=self.modifyQueue)
        work_queue.start()

        #pool.close()
        #pool.join()

    def deliverWork(self):    
        if self.pages != []:
            pag = self.pages.pop()
            self.pool.apply_async(self.myFun)


    def modifyQueue(self):
        t = time.time()
        while (time.time()-t) < 10:
            time.sleep(1)
            self.pages.append(99)
            print self.pages
            self.deliverWork()

    def myFun(self):
        time.sleep(2)

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)

__getstate__ is always called prior to pickling an object, and allow you to specify exactly which pieces of the object's state should actually be pickled. Then upon unpickling, __setstate__(state) will be called if its implemented (it is in our case), or if it's not, the dict returned by __getstate__ will be used as the __dict__ for the unpickled instance. In the above example, we're explicitly setting __dict__ to the dict we returned in __getstate__, but we could have just not implemented __setstate__ and gotten the same effect.

like image 150
dano Avatar answered Oct 05 '22 02:10

dano


Dano's answer is a good approach if you must pass the entire object to the process. In your case, the function you are passing to pool has no requirement for a reference to the class instance. So an alternative may be to make the function a static method using the @staticmethod decorator. If the function does require reference to one or two class member variables these could be passed in as arguments for read only variables, and updated in a callback if a write is required also (of course, you will need to do this if you want to update the local class instance in any case).

For example:

Class A(object):

    def __init__(self):
        self._pool = multiprocessing.Pool(1)
        self.member_variable = 1

    @staticmethod
    def MyFunc(variable):
        variable += 1
        return variable

    def Callback(self, return_val):
        self.member_variable = return_val

    def CallFuncAsync(self):
        pool.apply_async(self.MyFunc, (self.member_variable,), callback=self.Callback)
like image 38
Matt Avatar answered Oct 05 '22 00:10

Matt