Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?

I am trying to write an application that applies a function concurrently with a multiprocessing.Pool. I would like this function to be an instance method (so I can define it differently in different subclasses). This doesn't seem to be possible; as I have learned elsewhere, apparently bound methods can't be pickled. So why does starting a multiprocessing.Process with a bound method as a target work? The following code:

import multiprocessing  def test1():     print "Hello, world 1"  def increment(x):     return x + 1  class testClass():     def process(self):         process1 = multiprocessing.Process(target=test1)         process1.start()         process1.join()         process2 = multiprocessing.Process(target=self.test2)         process2.start()         process2.join()      def pool(self):         pool = multiprocessing.Pool(1)         for answer in pool.imap(increment, range(10)):             print answer         print         for answer in pool.imap(self.square, range(10)):             print answer      def test2(self):         print "Hello, world 2"      def square(self, x):         return x * x  def main():     c = testClass()     c.process()     c.pool()  if __name__ == "__main__":     main() 

Produces this output:

Hello, world 1 Hello, world 2 1 2 3 4 5 6 7 8 9 10  Exception in thread Thread-2: Traceback (most recent call last):   File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner     self.run()   File "C:\Python27\Lib\threading.py", line 504, in run     self.__target(*self.__args, **self.__kwargs)   File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks     put(task) PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed 

Why can Processes handle bound methods, but not Pools?

like image 518
dpitch40 Avatar asked Dec 05 '14 14:12

dpitch40


People also ask

What differences are between process and pool in multiprocessing?

Pool is generally used for heterogeneous tasks, whereas multiprocessing. Process is generally used for homogeneous tasks. The Pool is designed to execute heterogeneous tasks, that is tasks that do not resemble each other. For example, each task submitted to the process pool may be a different target function.

How do you pass multiple arguments in a multiprocessing pool?

Use Pool.The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.

How many processes can acquire an instance of multiprocessing lock concurrently?

An instance of the lock can be created and then acquired by processes before accessing a critical section, and released after the critical section. Only one process can have the lock at any time.

How does a pool work in multiprocessing?

Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.


2 Answers

The pickle module normally can't pickle instance methods:

>>> import pickle >>> class A(object): ...  def z(self): print "hi" ...  >>> a = A() >>> pickle.dumps(a.z) Traceback (most recent call last):   File "<stdin>", line 1, in <module>   File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps     Pickler(file, protocol).dump(obj)   File "/usr/local/lib/python2.7/pickle.py", line 224, in dump     self.save(obj)   File "/usr/local/lib/python2.7/pickle.py", line 306, in save     rv = reduce(self.proto)   File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex     raise TypeError, "can't pickle %s objects" % base.__name__ TypeError: can't pickle instancemethod objects 

However, the multiprocessing module has a custom Pickler that adds some code to enable this feature:

# # Try making some callable types picklable #  from pickle import Pickler class ForkingPickler(Pickler):     dispatch = Pickler.dispatch.copy()      @classmethod     def register(cls, type, reduce):         def dispatcher(self, obj):             rv = reduce(obj)             self.save_reduce(obj=obj, *rv)         cls.dispatch[type] = dispatcher  def _reduce_method(m):     if m.im_self is None:         return getattr, (m.im_class, m.im_func.func_name)     else:         return getattr, (m.im_self, m.im_func.func_name) ForkingPickler.register(type(ForkingPickler.save), _reduce_method) 

You can replicate this using the copy_reg module to see it work for yourself:

>>> import copy_reg >>> def _reduce_method(m): ...     if m.im_self is None: ...         return getattr, (m.im_class, m.im_func.func_name) ...     else: ...         return getattr, (m.im_self, m.im_func.func_name) ...  >>> copy_reg.pickle(type(a.z), _reduce_method) >>> pickle.dumps(a.z) "c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n." 

When you use Process.start to spawn a new process on Windows, it pickles all the parameters you passed to the child process using this custom ForkingPickler:

# # Windows #  else:     # snip...     from pickle import load, HIGHEST_PROTOCOL      def dump(obj, file, protocol=None):         ForkingPickler(file, protocol).dump(obj)      #     # We define a Popen class similar to the one from subprocess, but     # whose constructor takes a process object as its argument.     #      class Popen(object):         '''         Start a subprocess to run the code of a process object         '''         _tls = thread._local()          def __init__(self, process_obj):             # create pipe for communication with child             rfd, wfd = os.pipe()              # get handle for read end of the pipe and make it inheritable             ...             # start process             ...              # set attributes of self             ...              # send information to child             prep_data = get_preparation_data(process_obj._name)             to_child = os.fdopen(wfd, 'wb')             Popen._tls.process_handle = int(hp)             try:                 dump(prep_data, to_child, HIGHEST_PROTOCOL)                 dump(process_obj, to_child, HIGHEST_PROTOCOL)             finally:                 del Popen._tls.process_handle                 to_child.close() 

Note the "send information to the child" section. It's using the dump function, which uses ForkingPickler to pickle the data, which means your instance method can be pickled.

Now, when you use methods on multiprocessing.Pool to send a method to a child process, it's using a multiprocessing.Pipe to pickle the data. In Python 2.7, multiprocessing.Pipe is implemented in C, and calls pickle_dumps directly, so it doesn't take advantage of the ForkingPickler. That means pickling the instance method doesn't work.

However, if you use copy_reg to register the instancemethod type, rather than a custom Pickler, all attempts at pickling will be affected. So you can use that to enable pickling instance methods, even via Pool:

import multiprocessing import copy_reg import types  def _reduce_method(m):     if m.im_self is None:         return getattr, (m.im_class, m.im_func.func_name)     else:         return getattr, (m.im_self, m.im_func.func_name) copy_reg.pickle(types.MethodType, _reduce_method)  def test1():     print("Hello, world 1")  def increment(x):     return x + 1  class testClass():     def process(self):         process1 = multiprocessing.Process(target=test1)         process1.start()         process1.join()         process2 = multiprocessing.Process(target=self.test2)         process2.start()         process2.join()      def pool(self):         pool = multiprocessing.Pool(1)         for answer in pool.imap(increment, range(10)):             print(answer)         print         for answer in pool.imap(self.square, range(10)):             print(answer)      def test2(self):         print("Hello, world 2")      def square(self, x):         return x * x  def main():     c = testClass()     c.process()     c.pool()  if __name__ == "__main__":     main() 

Output:

Hello, world 1 Hello, world 2 GOT (0, 0, (True, 1)) GOT (0, 1, (True, 2)) GOT (0, 2, (True, 3)) GOT (0, 3, (True, 4)) GOT (0, 4, (True, 5))  1GOT (0, 5, (True, 6))  GOT (0, 6, (True, 7)) 2 GOT (0, 7, (True, 8)) 3  GOT (0, 8, (True, 9)) GOT (0, 9, (True, 10)) 4 5 6 7 8 9 10  GOT (1, 0, (True, 0)) 0 GOT (1, 1, (True, 1)) 1 GOT (1, 2, (True, 4)) 4 GOT (1, 3, (True, 9)) 9  GOT (1, 4, (True, 16)) 16 GOT (1, 5, (True, 25)) 25  GOT (1, 6, (True, 36)) 36  GOT (1, 7, (True, 49)) 49  GOT (1, 8, (True, 64)) 64 GOT (1, 9, (True, 81)) 81 GOT None 

Also note that in Python 3.x, pickle can pickle instance method types natively, so none of this stuff matters any more. :)

like image 197
dano Avatar answered Oct 04 '22 04:10

dano


Here's an alternative that I use sometimes, and it works in Python2.x:

You can create a top-level "alias" of sorts to instance methods, that accept an object whose instance methods you want to run in a pool, and have it call the instance methods for you:

import functools import multiprocessing  def _instance_method_alias(obj, arg):     """     Alias for instance method that allows the method to be called in a      multiprocessing pool     """     obj.instance_method(arg)     return  class MyClass(object):     """     Our custom class whose instance methods we want to be able to use in a      multiprocessing pool     """      def __init__(self):         self.my_string = "From MyClass: {}"      def instance_method(self, arg):         """         Some arbitrary instance method         """          print(self.my_string.format(arg))         return  # create an object of MyClass obj = MyClass()  # use functools.partial to create a new method that always has the  # MyClass object passed as its first argument _bound_instance_method_alias = functools.partial(_instance_method_alias, obj)  # create our list of things we will use the pool to map l = [1,2,3]  # create the pool of workers pool = multiprocessing.Pool()  # call pool.map, passing it the newly created function pool.map(_bound_instance_method_alias, l)  # cleanup pool.close() pool.join() 

This code produces this output:

From MyClass: 1
From MyClass: 2
From MyClass: 3

One limitation is that you can't use this for methods that modify the object. Each process gets a copy of the object it is calling the methods on, so changes won't be propagated back to the main process. If you don't need to modify the object from the methods you're calling though, this can be a simple solution.

like image 25
skrrgwasme Avatar answered Oct 04 '22 05:10

skrrgwasme