To get the return value of a function passed to Python multiprocessing. Process, we can use the manager. dict method to create a shared variable. to create a Process object with the target set to the worker function that runs for each process.
It works like a map-reduce architecture. It maps the input to the different processors and collects the output from all the processors. After the execution of code, it returns the output in form of a list or array. It waits for all the tasks to finish and then returns the output.
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
Use shared variable to communicate. For example like this:
import multiprocessing
def worker(procnum, return_dict):
"""worker function"""
print(str(procnum) + " represent!")
return_dict[procnum] = procnum
if __name__ == "__main__":
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
print(return_dict.values())
I think the approach suggested by @sega_sai is the better one. But it really needs a code example, so here goes:
import multiprocessing
from os import getpid
def worker(procnum):
print('I am number %d in process %d' % (procnum, getpid()))
return getpid()
if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print(pool.map(worker, range(5)))
Which will print the return values:
I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]
If you are familiar with map
(the Python 2 built-in) this should not be too challenging. Otherwise have a look at sega_Sai's link.
Note how little code is needed. (Also note how processes are re-used).
For anyone else who is seeking how to get a value from a Process
using Queue
:
import multiprocessing
ret = {'foo': False}
def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)
if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join()
print(queue.get()) # Prints {"foo": True}
Note that in Windows or Jupyter Notebook, with multithreading
you have to save this as a file and execute the file. If you do it in a command prompt you will see an error like this:
AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
For some reason, I couldn't find a general example of how to do this with Queue
anywhere (even Python's doc examples don't spawn multiple processes), so here's what I got working after like 10 tries:
def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)
def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets
Queue
is a blocking, thread-safe queue that you can use to store the return values from the child processes. So you have to pass the queue to each process. Something less obvious here is that you have to get()
from the queue before you join
the Process
es or else the queue fills up and blocks everything.
Update for those who are object-oriented (tested in Python 3.4):
from multiprocessing import Process, Queue
class Multiprocessor():
def __init__(self):
self.processes = []
self.queue = Queue()
@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)
def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()
def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets
# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
This example shows how to use a list of multiprocessing.Pipe instances to return strings from an arbitrary number of processes:
import multiprocessing
def worker(procnum, send_end):
'''worker function'''
result = str(procnum) + ' represent!'
print result
send_end.send(result)
def main():
jobs = []
pipe_list = []
for i in range(5):
recv_end, send_end = multiprocessing.Pipe(False)
p = multiprocessing.Process(target=worker, args=(i, send_end))
jobs.append(p)
pipe_list.append(recv_end)
p.start()
for proc in jobs:
proc.join()
result_list = [x.recv() for x in pipe_list]
print result_list
if __name__ == '__main__':
main()
Output:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']
This solution uses fewer resources than a multiprocessing.Queue which uses
or a multiprocessing.SimpleQueue which uses
It is very instructive to look at the source for each of these types.
You can use the exit
built-in to set the exit code of a process. It can be obtained from the exitcode
attribute of the process:
import multiprocessing
def worker(procnum):
print str(procnum) + ' represent!'
exit(procnum)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
result = []
for proc in jobs:
proc.join()
result.append(proc.exitcode)
print result
Output:
0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With