Perhaps someone more fluent in Python's Multiprocessing Pool code could help me out. I am trying to connect to several hosts on my network simultaneously (N at any one time) over a socket connection and execute some RPC's. As one host finishes, I want to add the next host into the Pool to run until all are complete.
I have a class, HClass, with some methods to do so, and a list of hostnames contained in hostlist. But I am failing to grok any of the docs.python.org examples for Pool to get this working.
A short snippet of code to illustrate what I've got so far:
hostlist = [h1, h2, h3, h4, ....]
poolsize = 2
class HClass:
def __init__(self, hostname="default"):
self.hostname = hostname
def go(self):
# do stuff
# do more stuff
....
if __name__ == "__main__":
objs = [HClass(hostname=current_host) for current_host in hostlist]
pool = multiprocessing.pool(poolsize)
results = pool.apply_async(objs.go())
So far I am blessed with this traceback:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'generator'>: attribute lookup __builtin__.generator failed
Where the process just hangs until I Control-C out of it.
Passing Keyword Arguments to Multiprocessing Processes We can also pass in arguments corresponding to the parameter name using the kwargs parameter in the Process class. Instead of passing a tuple, we pass a dictionary to kwargs where we specify the argument name and the variable being passed in as that argument.
map() function the Pool. starmap() allows us to issue tasks in chunks to the process pool. That is, we can group a fixed number of items from the input iterable and issue them as one task to be executed by a child worker process.
If we are using the context manager to create the process pool so that it is automatically shutdown, then you can configure the number of processes in the same manner. The number of workers must be less than or equal to 61 if Windows is your operating system.
I would try to keep interprocess communication down to a minimum. It looks like all you really need to send is the hostname string:
for host in hostlist:
pool.apply_async(worker, args = (host,), callback = on_return)
For example,
import multiprocessing as mp
import time
import logging
logger = mp.log_to_stderr(logging.INFO)
hostlist = ['h1', 'h2', 'h3', 'h4']*3
poolsize = 2
class HClass:
def __init__(self, hostname="default"):
self.hostname = hostname
def go(self):
logger.info('processing {h}'.format(h = self.hostname))
time.sleep(1)
return self.hostname
def worker(host):
h = HClass(hostname = host)
return h.go()
result = []
def on_return(retval):
result.append(retval)
if __name__ == "__main__":
pool = mp.Pool(poolsize)
for host in hostlist:
pool.apply_async(worker, args = (host,), callback = on_return)
pool.close()
pool.join()
logger.info(result)
I think this is the same question with Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map()
Copied from the answers in the above link. The problem is that multiprocessing must pickle things to sling them among processes, and bound methods are not picklable.
One approach is making go function unbounded, like puting it out of class. Or make that function packlable with copy_reg
I agree with @unutbu's solution... simpler is better. However if you did have to send the class method go
, I'd use pathos.multiprocesssing
, instead of multiprocessing
.
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
Get the code here: https://github.com/uqfoundation/pathos
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With