I am trying to write an application that applies a function concurrently with a multiprocessing.Pool
. I would like this function to be an instance method (so I can define it differently in different subclasses). This doesn't seem to be possible; as I have learned elsewhere, apparently bound methods can't be pickled. So why does starting a multiprocessing.Process
with a bound method as a target work? The following code:
import multiprocessing def test1(): print "Hello, world 1" def increment(x): return x + 1 class testClass(): def process(self): process1 = multiprocessing.Process(target=test1) process1.start() process1.join() process2 = multiprocessing.Process(target=self.test2) process2.start() process2.join() def pool(self): pool = multiprocessing.Pool(1) for answer in pool.imap(increment, range(10)): print answer print for answer in pool.imap(self.square, range(10)): print answer def test2(self): print "Hello, world 2" def square(self, x): return x * x def main(): c = testClass() c.process() c.pool() if __name__ == "__main__": main()
Produces this output:
Hello, world 1 Hello, world 2 1 2 3 4 5 6 7 8 9 10 Exception in thread Thread-2: Traceback (most recent call last): File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner self.run() File "C:\Python27\Lib\threading.py", line 504, in run self.__target(*self.__args, **self.__kwargs) File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks put(task) PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
Why can Processes handle bound methods, but not Pools?
Pool is generally used for heterogeneous tasks, whereas multiprocessing. Process is generally used for homogeneous tasks. The Pool is designed to execute heterogeneous tasks, that is tasks that do not resemble each other. For example, each task submitted to the process pool may be a different target function.
Use Pool.The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.
An instance of the lock can be created and then acquired by processes before accessing a critical section, and released after the critical section. Only one process can have the lock at any time.
Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.
The pickle
module normally can't pickle instance methods:
>>> import pickle >>> class A(object): ... def z(self): print "hi" ... >>> a = A() >>> pickle.dumps(a.z) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps Pickler(file, protocol).dump(obj) File "/usr/local/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File "/usr/local/lib/python2.7/pickle.py", line 306, in save rv = reduce(self.proto) File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex raise TypeError, "can't pickle %s objects" % base.__name__ TypeError: can't pickle instancemethod objects
However, the multiprocessing
module has a custom Pickler
that adds some code to enable this feature:
# # Try making some callable types picklable # from pickle import Pickler class ForkingPickler(Pickler): dispatch = Pickler.dispatch.copy() @classmethod def register(cls, type, reduce): def dispatcher(self, obj): rv = reduce(obj) self.save_reduce(obj=obj, *rv) cls.dispatch[type] = dispatcher def _reduce_method(m): if m.im_self is None: return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
You can replicate this using the copy_reg
module to see it work for yourself:
>>> import copy_reg >>> def _reduce_method(m): ... if m.im_self is None: ... return getattr, (m.im_class, m.im_func.func_name) ... else: ... return getattr, (m.im_self, m.im_func.func_name) ... >>> copy_reg.pickle(type(a.z), _reduce_method) >>> pickle.dumps(a.z) "c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n."
When you use Process.start
to spawn a new process on Windows, it pickles all the parameters you passed to the child process using this custom ForkingPickler
:
# # Windows # else: # snip... from pickle import load, HIGHEST_PROTOCOL def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) # # We define a Popen class similar to the one from subprocess, but # whose constructor takes a process object as its argument. # class Popen(object): ''' Start a subprocess to run the code of a process object ''' _tls = thread._local() def __init__(self, process_obj): # create pipe for communication with child rfd, wfd = os.pipe() # get handle for read end of the pipe and make it inheritable ... # start process ... # set attributes of self ... # send information to child prep_data = get_preparation_data(process_obj._name) to_child = os.fdopen(wfd, 'wb') Popen._tls.process_handle = int(hp) try: dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL) finally: del Popen._tls.process_handle to_child.close()
Note the "send information to the child" section. It's using the dump
function, which uses ForkingPickler
to pickle the data, which means your instance method can be pickled.
Now, when you use methods on multiprocessing.Pool
to send a method to a child process, it's using a multiprocessing.Pipe
to pickle the data. In Python 2.7, multiprocessing.Pipe
is implemented in C, and calls pickle_dumps
directly, so it doesn't take advantage of the ForkingPickler
. That means pickling the instance method doesn't work.
However, if you use copy_reg
to register the instancemethod
type, rather than a custom Pickler
, all attempts at pickling will be affected. So you can use that to enable pickling instance methods, even via Pool
:
import multiprocessing import copy_reg import types def _reduce_method(m): if m.im_self is None: return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) copy_reg.pickle(types.MethodType, _reduce_method) def test1(): print("Hello, world 1") def increment(x): return x + 1 class testClass(): def process(self): process1 = multiprocessing.Process(target=test1) process1.start() process1.join() process2 = multiprocessing.Process(target=self.test2) process2.start() process2.join() def pool(self): pool = multiprocessing.Pool(1) for answer in pool.imap(increment, range(10)): print(answer) print for answer in pool.imap(self.square, range(10)): print(answer) def test2(self): print("Hello, world 2") def square(self, x): return x * x def main(): c = testClass() c.process() c.pool() if __name__ == "__main__": main()
Output:
Hello, world 1 Hello, world 2 GOT (0, 0, (True, 1)) GOT (0, 1, (True, 2)) GOT (0, 2, (True, 3)) GOT (0, 3, (True, 4)) GOT (0, 4, (True, 5)) 1GOT (0, 5, (True, 6)) GOT (0, 6, (True, 7)) 2 GOT (0, 7, (True, 8)) 3 GOT (0, 8, (True, 9)) GOT (0, 9, (True, 10)) 4 5 6 7 8 9 10 GOT (1, 0, (True, 0)) 0 GOT (1, 1, (True, 1)) 1 GOT (1, 2, (True, 4)) 4 GOT (1, 3, (True, 9)) 9 GOT (1, 4, (True, 16)) 16 GOT (1, 5, (True, 25)) 25 GOT (1, 6, (True, 36)) 36 GOT (1, 7, (True, 49)) 49 GOT (1, 8, (True, 64)) 64 GOT (1, 9, (True, 81)) 81 GOT None
Also note that in Python 3.x, pickle
can pickle instance method types natively, so none of this stuff matters any more. :)
Here's an alternative that I use sometimes, and it works in Python2.x:
You can create a top-level "alias" of sorts to instance methods, that accept an object whose instance methods you want to run in a pool, and have it call the instance methods for you:
import functools import multiprocessing def _instance_method_alias(obj, arg): """ Alias for instance method that allows the method to be called in a multiprocessing pool """ obj.instance_method(arg) return class MyClass(object): """ Our custom class whose instance methods we want to be able to use in a multiprocessing pool """ def __init__(self): self.my_string = "From MyClass: {}" def instance_method(self, arg): """ Some arbitrary instance method """ print(self.my_string.format(arg)) return # create an object of MyClass obj = MyClass() # use functools.partial to create a new method that always has the # MyClass object passed as its first argument _bound_instance_method_alias = functools.partial(_instance_method_alias, obj) # create our list of things we will use the pool to map l = [1,2,3] # create the pool of workers pool = multiprocessing.Pool() # call pool.map, passing it the newly created function pool.map(_bound_instance_method_alias, l) # cleanup pool.close() pool.join()
This code produces this output:
From MyClass: 1
From MyClass: 2
From MyClass: 3
One limitation is that you can't use this for methods that modify the object. Each process gets a copy of the object it is calling the methods on, so changes won't be propagated back to the main process. If you don't need to modify the object from the methods you're calling though, this can be a simple solution.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With