I am using pathos.multiprocessing to parallelize a program that requires using instance methods. Here is a minimum working example:
import time
import numpy as np
from pathos.multiprocessing import Pool, ProcessingPool, ThreadingPool
class dummy(object):
def __init__(self, arg, key1=None, key2=-11):
np.random.seed(arg)
randnum = np.random.randint(0, 5)
print 'Sleeping {} seconds'.format(randnum)
time.sleep(randnum)
self.value = arg
self.more1 = key1
self.more2 = key2
args = [0, 10, 20, 33, 82]
keys = ['key1', 'key2']
k1val = ['car', 'borg', 'syria', 'aurora', 'libera']
k2val = ['a', 'b', 'c', 'd', 'e']
allks = [dict(zip(keys, [k1val[i], k2val[i]])) for i in range(5)]
pool = ThreadingPool(4)
result = pool.map(dummy, args, k1val, k2val)
print [[r.value, r.more1, r.more2] for r in result]
The result printed is (as expected):
Sleeping 4 seconds
Sleeping 1 seconds
Sleeping 3 seconds
Sleeping 4 seconds
Sleeping 3 seconds
[[0, 'car', 'a'], [10, 'borg', 'b'], [20, 'syria', 'c'], [33, 'aurora', 'd'], [82, 'libera', 'e']]
However, in this call to map
the order of the last two arguments matters, and if I do:
result2 = pool.map(dummy, args, k2val, k1val)
I obtain:
[[0, 'a', 'car'], [10, 'b', 'borg'], [20, 'c', 'syria'], [33, 'd', 'aurora'], [82, 'e', 'libera']]
whereas I would like to obtain the same as the first result. The behaviour would be the same as what apply_async
kwds
can do in the standard module multiprocessing
, i.e. pass a list of dictionaries, where in each dictionary the keys are the keyword names and the items are the keyword arguments (see allks
). Notice that the standard module multiprocessing
cannot use instance methods, and therefore does not meet even the minimum requirements.
Tentatively this would be: result = pool.map(dummy, args, kwds=allks) # This does not work
I'm the pathos
author. Yeah, you hit on something that I have known needs a little work. Currently, the map
and pipe
(i.e. apply
) methods from ProcessPool
, ThreadPool
, and ParallelPool
cannot take kwds
-- you have to pass them as args
. However, if you use _ProcessPool
or _ThreadPool
, then you can pass kwds
to their map
and apply
methods.
The pools in pathos.pools
that start with an underscore actually come directly from multiprocess
, so they have identical APIs to those in multiprocessing
(but with better serialization, so can pass class methods etc).
>>> from pathos.pools import _ProcessPool
>>> from multiprocess.pool import Pool
>>> Pool is _ProcessPool
True
So, for the edits to the original code would look something like this (from the OP's suggested edit):
>>> from pathos.pools import _ThreadPool
>>> pool = _ThreadPool(4)
>>>
[…]
>>> result = []
>>> def callback(x):
>>> result.append(x)
>>>
>>> for a, k in zip(args, allks):
>>> pool.apply_async(dummy, args=(a,), kwds=k, callback=callback)
>>>
>>> pool.close()
>>> pool.join()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With