Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing pool - iterating over objects methods?

Perhaps someone more fluent in Python's Multiprocessing Pool code could help me out. I am trying to connect to several hosts on my network simultaneously (N at any one time) over a socket connection and execute some RPC's. As one host finishes, I want to add the next host into the Pool to run until all are complete.

I have a class, HClass, with some methods to do so, and a list of hostnames contained in hostlist. But I am failing to grok any of the docs.python.org examples for Pool to get this working.

A short snippet of code to illustrate what I've got so far:

hostlist = [h1, h2, h3, h4, ....]
poolsize = 2

class HClass:
  def __init__(self, hostname="default"):
    self.hostname = hostname

  def go(self):
      # do stuff
      # do more stuff
  ....

if __name__ == "__main__":
  objs = [HClass(hostname=current_host) for current_host in hostlist]
  pool = multiprocessing.pool(poolsize)
  results = pool.apply_async(objs.go())

So far I am blessed with this traceback:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'generator'>: attribute lookup __builtin__.generator failed

Where the process just hangs until I Control-C out of it.

like image 578
jfofo Avatar asked Jan 02 '13 03:01

jfofo


People also ask

How do you pass arguments in multiprocessing in Python?

Passing Keyword Arguments to Multiprocessing Processes We can also pass in arguments corresponding to the parameter name using the kwargs parameter in the Process class. Instead of passing a tuple, we pass a dictionary to kwargs where we specify the argument name and the variable being passed in as that argument.

What is pool starmap?

map() function the Pool. starmap() allows us to issue tasks in chunks to the process pool. That is, we can group a fixed number of items from the input iterable and issue them as one task to be executed by a child worker process.

How many processes should be running Python multiprocessing?

If we are using the context manager to create the process pool so that it is automatically shutdown, then you can configure the number of processes in the same manner. The number of workers must be less than or equal to 61 if Windows is your operating system.


3 Answers

I would try to keep interprocess communication down to a minimum. It looks like all you really need to send is the hostname string:

for host in hostlist:
    pool.apply_async(worker, args = (host,), callback = on_return)

For example,

import multiprocessing as mp
import time
import logging

logger = mp.log_to_stderr(logging.INFO)

hostlist = ['h1', 'h2', 'h3', 'h4']*3
poolsize = 2

class HClass:
    def __init__(self, hostname="default"):
        self.hostname = hostname

    def go(self):
        logger.info('processing {h}'.format(h = self.hostname))
        time.sleep(1)
        return self.hostname

def worker(host):
    h = HClass(hostname = host)
    return h.go()

result = []
def on_return(retval):
    result.append(retval)

if __name__ == "__main__":
    pool = mp.Pool(poolsize)
    for host in hostlist:
        pool.apply_async(worker, args = (host,), callback = on_return)
    pool.close()
    pool.join()
    logger.info(result)
like image 135
unutbu Avatar answered Nov 14 '22 22:11

unutbu


I think this is the same question with Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()

Copied from the answers in the above link. The problem is that multiprocessing must pickle things to sling them among processes, and bound methods are not picklable.

One approach is making go function unbounded, like puting it out of class. Or make that function packlable with copy_reg

like image 44
jinghli Avatar answered Nov 14 '22 20:11

jinghli


I agree with @unutbu's solution... simpler is better. However if you did have to send the class method go, I'd use pathos.multiprocesssing, instead of multiprocessing.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

Get the code here: https://github.com/uqfoundation/pathos

like image 23
Mike McKerns Avatar answered Nov 14 '22 20:11

Mike McKerns