I would like to use the multiprocessing
library in Python. Sadly multiprocessing
uses pickle
which doesn't support functions with closures, lambdas, or functions in __main__
. All three of these are important to me
In [1]: import pickle In [2]: pickle.dumps(lambda x: x) PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda>
Fortunately there is dill
a more robust pickle. Apparently dill
performs magic on import to make pickle work
In [3]: import dill In [4]: pickle.dumps(lambda x: x) Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ...
This is very encouraging, particularly because I don't have access to the multiprocessing source code. Sadly, I still can't get this very basic example to work
import multiprocessing as mp import dill p = mp.Pool(4) print p.map(lambda x: x**2, range(10))
Why is this? What am I missing? Exactly what are the limitations on the multiprocessing
+dill
combination?
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py Temporary Edit for J.F Sebastian mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py Exception in thread Thread-2: Traceback (most recent call last): File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner self.run() File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run self.__target(*self.__args, **self.__kwargs) File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed ^C ...lots of junk... [DEBUG/MainProcess] cleaning up worker 3 [DEBUG/MainProcess] cleaning up worker 2 [DEBUG/MainProcess] cleaning up worker 1 [DEBUG/MainProcess] cleaning up worker 0 [DEBUG/MainProcess] added worker [DEBUG/MainProcess] added worker [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-7] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-8] child process calling self.run()Exception in thread Thread-2: Traceback (most recent call last): File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner self.run() File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run self.__target(*self.__args, **self.__kwargs) File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed ^C ...lots of junk... [DEBUG/MainProcess] cleaning up worker 3 [DEBUG/MainProcess] cleaning up worker 2 [DEBUG/MainProcess] cleaning up worker 1 [DEBUG/MainProcess] cleaning up worker 0 [DEBUG/MainProcess] added worker [DEBUG/MainProcess] added worker [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-7] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-8] child process calling self.run()
Pickle module can serialize most of the python's objects except for a few types, including lambda expressions, multiprocessing, threading, database connections, etc. Dill module might work as a great alternative to serialize the unpickable objects.
dill can be used to store python objects to a file, but the primary usage is to send python objects across the network as a byte stream. dill is quite flexible, and allows arbitrary user defined classes and functions to be serialized.
multiprocessing
makes some bad choices about pickling. Don't get me wrong, it makes some good choices that enable it to pickle certain types so they can be used in a pool's map function. However, since we have dill
that can do the pickling, multiprocessing's own pickling becomes a bit limiting. Actually, if multiprocessing
were to use pickle
instead of cPickle
... and also drop some of it's own pickling overrides, then dill
could take over and give a much more full serialization for multiprocessing
.
Until that happens, there's a fork of multiprocessing
called pathos (the release version is a bit stale, unfortunately) that removes the above limitations. Pathos also adds some nice features that multiprocessing doesn't have, like multi-args in the map function. Pathos is due for a release, after some mild updating -- mostly conversion to python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49) [GCC 4.2.1 (Apple Inc. build 5566)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> import dill >>> from pathos.multiprocessing import ProcessingPool >>> pool = ProcessingPool(nodes=4) >>> result = pool.map(lambda x: x**2, range(10)) >>> result [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
and just to show off a little of what pathos.multiprocessing
can do...
>>> def busy_add(x,y, delay=0.01): ... for n in range(x): ... x += n ... for n in range(y): ... y -= n ... import time ... time.sleep(delay) ... return x + y ... >>> def busy_squared(x): ... import time, random ... time.sleep(2*random.random()) ... return x*x ... >>> def squared(x): ... return x*x ... >>> def quad_factory(a=1, b=1, c=0): ... def quad(x): ... return a*x**2 + b*x + c ... return quad ... >>> square_plus_one = quad_factory(2,0,1) >>> >>> def test1(pool): ... print pool ... print "x: %s\n" % str(x) ... print pool.map.__name__ ... start = time.time() ... res = pool.map(squared, x) ... print "time to results:", time.time() - start ... print "y: %s\n" % str(res) ... print pool.imap.__name__ ... start = time.time() ... res = pool.imap(squared, x) ... print "time to queue:", time.time() - start ... start = time.time() ... res = list(res) ... print "time to results:", time.time() - start ... print "y: %s\n" % str(res) ... print pool.amap.__name__ ... start = time.time() ... res = pool.amap(squared, x) ... print "time to queue:", time.time() - start ... start = time.time() ... res = res.get() ... print "time to results:", time.time() - start ... print "y: %s\n" % str(res) ... >>> def test2(pool, items=4, delay=0): ... _x = range(-items/2,items/2,2) ... _y = range(len(_x)) ... _d = [delay]*len(_x) ... print map ... res1 = map(busy_squared, _x) ... res2 = map(busy_add, _x, _y, _d) ... print pool.map ... _res1 = pool.map(busy_squared, _x) ... _res2 = pool.map(busy_add, _x, _y, _d) ... assert _res1 == res1 ... assert _res2 == res2 ... print pool.imap ... _res1 = pool.imap(busy_squared, _x) ... _res2 = pool.imap(busy_add, _x, _y, _d) ... assert list(_res1) == res1 ... assert list(_res2) == res2 ... print pool.amap ... _res1 = pool.amap(busy_squared, _x) ... _res2 = pool.amap(busy_add, _x, _y, _d) ... assert _res1.get() == res1 ... assert _res2.get() == res2 ... print "" ... >>> def test3(pool): # test against a function that should fail in pickle ... print pool ... print "x: %s\n" % str(x) ... print pool.map.__name__ ... start = time.time() ... res = pool.map(square_plus_one, x) ... print "time to results:", time.time() - start ... print "y: %s\n" % str(res) ... >>> def test4(pool, maxtries, delay): ... print pool ... m = pool.amap(busy_add, x, x) ... tries = 0 ... while not m.ready(): ... time.sleep(delay) ... tries += 1 ... print "TRY: %s" % tries ... if tries >= maxtries: ... print "TIMEOUT" ... break ... print m.get() ... >>> import time >>> x = range(18) >>> delay = 0.01 >>> items = 20 >>> maxtries = 20 >>> from pathos.multiprocessing import ProcessingPool as Pool >>> pool = Pool(nodes=4) >>> test1(pool) <pool ProcessingPool(ncpus=4)> x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17] map time to results: 0.0553691387177 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289] imap time to queue: 7.91549682617e-05 time to results: 0.102381229401 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289] amap time to queue: 7.08103179932e-05 time to results: 0.0489699840546 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289] >>> test2(pool, items, delay) <built-in function map> <bound method ProcessingPool.map of <pool ProcessingPool(ncpus=4)>> <bound method ProcessingPool.imap of <pool ProcessingPool(ncpus=4)>> <bound method ProcessingPool.amap of <pool ProcessingPool(ncpus=4)>> >>> test3(pool) <pool ProcessingPool(ncpus=4)> x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17] map time to results: 0.0523059368134 y: [1, 3, 9, 19, 33, 51, 73, 99, 129, 163, 201, 243, 289, 339, 393, 451, 513, 579] >>> test4(pool, maxtries, delay) <pool ProcessingPool(ncpus=4)> TRY: 1 TRY: 2 TRY: 3 TRY: 4 TRY: 5 TRY: 6 TRY: 7 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With