Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What can multiprocessing and dill do together?

I would like to use the multiprocessing library in Python. Sadly multiprocessing uses pickle which doesn't support functions with closures, lambdas, or functions in __main__. All three of these are important to me

In [1]: import pickle  In [2]: pickle.dumps(lambda x: x) PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda> 

Fortunately there is dill a more robust pickle. Apparently dill performs magic on import to make pickle work

In [3]: import dill  In [4]: pickle.dumps(lambda x: x) Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ... 

This is very encouraging, particularly because I don't have access to the multiprocessing source code. Sadly, I still can't get this very basic example to work

import multiprocessing as mp import dill  p = mp.Pool(4) print p.map(lambda x: x**2, range(10)) 

Why is this? What am I missing? Exactly what are the limitations on the multiprocessing+dill combination?

Temporary Edit for J.F Sebastian

mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py      Temporary Edit for J.F Sebastian  mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py  Exception in thread Thread-2: Traceback (most recent call last):   File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner     self.run()   File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run     self.__target(*self.__args, **self.__kwargs)   File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks     put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed  ^C ...lots of junk...  [DEBUG/MainProcess] cleaning up worker 3 [DEBUG/MainProcess] cleaning up worker 2 [DEBUG/MainProcess] cleaning up worker 1 [DEBUG/MainProcess] cleaning up worker 0 [DEBUG/MainProcess] added worker [DEBUG/MainProcess] added worker [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-7] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-8] child process calling self.run()Exception in thread Thread-2: Traceback (most recent call last):   File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner     self.run()   File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run     self.__target(*self.__args, **self.__kwargs)   File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks     put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed  ^C ...lots of junk...  [DEBUG/MainProcess] cleaning up worker 3 [DEBUG/MainProcess] cleaning up worker 2 [DEBUG/MainProcess] cleaning up worker 1 [DEBUG/MainProcess] cleaning up worker 0 [DEBUG/MainProcess] added worker [DEBUG/MainProcess] added worker [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-7] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-8] child process calling self.run() 
like image 905
MRocklin Avatar asked Nov 14 '13 17:11

MRocklin


People also ask

What is pickle in multiprocessing?

Pickle module can serialize most of the python's objects except for a few types, including lambda expressions, multiprocessing, threading, database connections, etc. Dill module might work as a great alternative to serialize the unpickable objects.

What is Dill module in python?

dill can be used to store python objects to a file, but the primary usage is to send python objects across the network as a byte stream. dill is quite flexible, and allows arbitrary user defined classes and functions to be serialized.


1 Answers

multiprocessing makes some bad choices about pickling. Don't get me wrong, it makes some good choices that enable it to pickle certain types so they can be used in a pool's map function. However, since we have dill that can do the pickling, multiprocessing's own pickling becomes a bit limiting. Actually, if multiprocessing were to use pickle instead of cPickle... and also drop some of it's own pickling overrides, then dill could take over and give a much more full serialization for multiprocessing.

Until that happens, there's a fork of multiprocessing called pathos (the release version is a bit stale, unfortunately) that removes the above limitations. Pathos also adds some nice features that multiprocessing doesn't have, like multi-args in the map function. Pathos is due for a release, after some mild updating -- mostly conversion to python 3.x.

Python 2.7.5 (default, Sep 30 2013, 20:15:49)  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> import dill >>> from pathos.multiprocessing import ProcessingPool     >>> pool = ProcessingPool(nodes=4) >>> result = pool.map(lambda x: x**2, range(10)) >>> result [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 

and just to show off a little of what pathos.multiprocessing can do...

>>> def busy_add(x,y, delay=0.01): ...     for n in range(x): ...        x += n ...     for n in range(y): ...        y -= n ...     import time ...     time.sleep(delay) ...     return x + y ...  >>> def busy_squared(x): ...     import time, random ...     time.sleep(2*random.random()) ...     return x*x ...  >>> def squared(x): ...     return x*x ...  >>> def quad_factory(a=1, b=1, c=0): ...     def quad(x): ...         return a*x**2 + b*x + c ...     return quad ...  >>> square_plus_one = quad_factory(2,0,1) >>>  >>> def test1(pool): ...     print pool ...     print "x: %s\n" % str(x) ...     print pool.map.__name__ ...     start = time.time() ...     res = pool.map(squared, x) ...     print "time to results:", time.time() - start ...     print "y: %s\n" % str(res) ...     print pool.imap.__name__ ...     start = time.time() ...     res = pool.imap(squared, x) ...     print "time to queue:", time.time() - start ...     start = time.time() ...     res = list(res) ...     print "time to results:", time.time() - start ...     print "y: %s\n" % str(res) ...     print pool.amap.__name__ ...     start = time.time() ...     res = pool.amap(squared, x) ...     print "time to queue:", time.time() - start ...     start = time.time() ...     res = res.get() ...     print "time to results:", time.time() - start ...     print "y: %s\n" % str(res) ...  >>> def test2(pool, items=4, delay=0): ...     _x = range(-items/2,items/2,2) ...     _y = range(len(_x)) ...     _d = [delay]*len(_x) ...     print map ...     res1 = map(busy_squared, _x) ...     res2 = map(busy_add, _x, _y, _d) ...     print pool.map ...     _res1 = pool.map(busy_squared, _x) ...     _res2 = pool.map(busy_add, _x, _y, _d) ...     assert _res1 == res1 ...     assert _res2 == res2 ...     print pool.imap ...     _res1 = pool.imap(busy_squared, _x) ...     _res2 = pool.imap(busy_add, _x, _y, _d) ...     assert list(_res1) == res1 ...     assert list(_res2) == res2 ...     print pool.amap ...     _res1 = pool.amap(busy_squared, _x) ...     _res2 = pool.amap(busy_add, _x, _y, _d) ...     assert _res1.get() == res1 ...     assert _res2.get() == res2 ...     print "" ...  >>> def test3(pool): # test against a function that should fail in pickle ...     print pool ...     print "x: %s\n" % str(x) ...     print pool.map.__name__ ...     start = time.time() ...     res = pool.map(square_plus_one, x) ...     print "time to results:", time.time() - start ...     print "y: %s\n" % str(res) ...  >>> def test4(pool, maxtries, delay): ...     print pool ...     m = pool.amap(busy_add, x, x) ...     tries = 0 ...     while not m.ready(): ...         time.sleep(delay) ...         tries += 1 ...         print "TRY: %s" % tries ...         if tries >= maxtries: ...             print "TIMEOUT" ...             break ...     print m.get() ...  >>> import time >>> x = range(18) >>> delay = 0.01 >>> items = 20 >>> maxtries = 20 >>> from pathos.multiprocessing import ProcessingPool as Pool >>> pool = Pool(nodes=4) >>> test1(pool) <pool ProcessingPool(ncpus=4)> x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]  map time to results: 0.0553691387177 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]  imap time to queue: 7.91549682617e-05 time to results: 0.102381229401 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]  amap time to queue: 7.08103179932e-05 time to results: 0.0489699840546 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]  >>> test2(pool, items, delay) <built-in function map> <bound method ProcessingPool.map of <pool ProcessingPool(ncpus=4)>> <bound method ProcessingPool.imap of <pool ProcessingPool(ncpus=4)>> <bound method ProcessingPool.amap of <pool ProcessingPool(ncpus=4)>>  >>> test3(pool) <pool ProcessingPool(ncpus=4)> x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]  map time to results: 0.0523059368134 y: [1, 3, 9, 19, 33, 51, 73, 99, 129, 163, 201, 243, 289, 339, 393, 451, 513, 579]  >>> test4(pool, maxtries, delay) <pool ProcessingPool(ncpus=4)> TRY: 1 TRY: 2 TRY: 3 TRY: 4 TRY: 5 TRY: 6 TRY: 7 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34] 
like image 194
Mike McKerns Avatar answered Sep 22 '22 17:09

Mike McKerns