Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass keywords list to pathos.multiprocessing?

I am using pathos.multiprocessing to parallelize a program that requires using instance methods. Here is a minimum working example:

import time
import numpy as np
from pathos.multiprocessing import Pool, ProcessingPool, ThreadingPool

class dummy(object):
    def __init__(self, arg, key1=None, key2=-11):

        np.random.seed(arg)

        randnum = np.random.randint(0, 5)

        print 'Sleeping {} seconds'.format(randnum)
        time.sleep(randnum)

        self.value = arg
        self.more1 = key1
        self.more2 = key2

args = [0, 10, 20, 33, 82] 
keys = ['key1', 'key2']
k1val = ['car', 'borg', 'syria', 'aurora', 'libera']
k2val = ['a', 'b', 'c', 'd', 'e']
allks = [dict(zip(keys, [k1val[i], k2val[i]])) for i in range(5)]

pool = ThreadingPool(4)
result = pool.map(dummy, args, k1val, k2val)

print [[r.value, r.more1, r.more2] for r in result]

The result printed is (as expected):

Sleeping 4 seconds
Sleeping 1 seconds
Sleeping 3 seconds
Sleeping 4 seconds
Sleeping 3 seconds
[[0, 'car', 'a'], [10, 'borg', 'b'], [20, 'syria', 'c'], [33, 'aurora', 'd'], [82, 'libera', 'e']]

However, in this call to map the order of the last two arguments matters, and if I do:

result2 = pool.map(dummy, args, k2val, k1val)

I obtain:

[[0, 'a', 'car'], [10, 'b', 'borg'], [20, 'c', 'syria'], [33, 'd', 'aurora'], [82, 'e', 'libera']]

whereas I would like to obtain the same as the first result. The behaviour would be the same as what apply_async kwds can do in the standard module multiprocessing, i.e. pass a list of dictionaries, where in each dictionary the keys are the keyword names and the items are the keyword arguments (see allks). Notice that the standard module multiprocessing cannot use instance methods, and therefore does not meet even the minimum requirements.

Tentatively this would be: result = pool.map(dummy, args, kwds=allks) # This does not work

like image 431
astabada Avatar asked Mar 11 '23 00:03

astabada


1 Answers

I'm the pathos author. Yeah, you hit on something that I have known needs a little work. Currently, the map and pipe (i.e. apply) methods from ProcessPool, ThreadPool, and ParallelPool cannot take kwds -- you have to pass them as args. However, if you use _ProcessPool or _ThreadPool, then you can pass kwds to their map and apply methods. The pools in pathos.pools that start with an underscore actually come directly from multiprocess, so they have identical APIs to those in multiprocessing (but with better serialization, so can pass class methods etc).

>>> from pathos.pools import _ProcessPool
>>> from multiprocess.pool import Pool
>>> Pool is _ProcessPool
True

So, for the edits to the original code would look something like this (from the OP's suggested edit):

>>> from pathos.pools import _ThreadPool
>>> pool = _ThreadPool(4)
>>> 
[…]
>>> result = []
>>> def callback(x):
>>>    result.append(x)
>>>
>>> for a, k in zip(args, allks):
>>>     pool.apply_async(dummy, args=(a,), kwds=k, callback=callback)
>>> 
>>> pool.close()
>>> pool.join()
like image 74
Mike McKerns Avatar answered Mar 19 '23 22:03

Mike McKerns