I have a main file that launches multiple processes and one of the processes again launches multiple processes. I am having problems launching the nested set of processes.
I have the following code in one file:
# parallel_test.py
import Queue
import multiprocessing
import time
import threading
def worker(q):
while not q.empty():
try:
row = q.get(False)
print row
time.sleep(1)
except Queue.Empty:
break
def main():
print 'creating queue'
q = multiprocessing.Queue()
print 'enqueuing'
for i in range(100):
q.put(i)
num_processes = 15
pool = []
for i in range(num_processes):
print 'launching process {0}'.format(i)
p = multiprocessing.Process(target=worker, args=(q,))
p.start()
pool.append(p)
for p in pool:
p.join()
if __name__ == '__main__':
main()
Running this file alone python parallel_test.py
works fine and prints the numbers as expected. But launching it from another file as another Process causes problem. My main file:
# main_loop_test.py
import parallel_test
from multiprocessing import Pool
import time
def main():
targets = [parallel_test.main]
running = True
while running:
try:
p = Pool(12)
for target in targets:
p.apply_async(target)
p.close() # For some reason you need to run close() before join()
p.join() # What for all the steps to be done
print 'All steps done'
time.sleep(2)
except KeyboardInterrupt as e:
print "<<<<<<<<<<<<<<<<<<CAUGHT KEYBOARD INTERRUPT FROM USER>>>>>>>>>>>>>>>>>>>"
running = False
if __name__ == '__main__':
main()
It parallel_test.py
seems to try and launch one process (which does nothing) and then exits the function and main_loop_test.py
prints 'All steps done'. No numbers are ever printed. Output:
creating queue
enqueuing
launching process 0
All steps done
creating queue
enqueuing
launching process 0
All steps done
What's going wrong? I get the same problem using Pool
instead of managing the processes myself in parallel_test.py
. Replacing multiprocessing with threading works though.
You are not able to create child process from parallel_test when you invoke it as child process from another program for the reason that the process is getting created as daemonic process and as it is mentioned in the link https://docs.python.org/2/library/multiprocessing.html , daemonic process is not allowed to create child process. You have to create the process as non daemonic process by setting the daemon property of the process to be false as like below.
p = multiprocessing.Process(target=test.main)
p.daemon = False
p.start()
p.join()
I am not sure how to set the daemon property when you create child process through Pool module. You can try to pass this property through initializer list.
Do you mean to use hierarchical parallelism through the multiprocessing
module, like this example showing a blocking processing map being executed inside an asynchronous processing map?
>>> def squared(x):
... return x**2
...
>>> def triple(x):
... return 3*x
...
>>> from pathos.multiprocessing import ProcessingPool as PPool
>>> res = PPool().amap(triple, PPool().map(squared, xrange(10)))
>>> res.get()
[0, 3, 12, 27, 48, 75, 108, 147, 192, 243]
I'm using the pathos
fork of multiprocessing
, as it provides a bit easier use than that of the standard library version.
Also see another example here: https://stackoverflow.com/questions/28203774/how-to-do-hierarchical-parallelism-in-ipython-parallel
EDIT: The above question was deleted... so I'm adding the contents formerly at the link here: Here's how to do nested parallelism using two different types of pools.
>>> from pathos.multiprocessing import ProcessingPool, ThreadingPool
>>> # build a non-blocking processing pool map (i.e. async_map)
>>> amap = ProcessingPool().amap
>>> # build a blocking thread pool map
>>> tmap = ThreadingPool().map
>>>
>>> # define an 'inner' function
>>> def g(x):
... import random
... return int(x * random.random())
...
>>> # parallelize the inner function
>>> def h(x):
... return sum(tmap(g, x))
...
>>> # define the 'outer' function
>>> def f(x,y):
... return x*y
...
>>> # define two lists of different lengths
>>> x = range(10)
>>> y = range(5)
>>>
>>> # evaluate in nested parallel (done several times, for effect)
>>> res1 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res2 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res3 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res4 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res1.get()
[0, 15, 34, 57, 64]
>>> res2.get()
[0, 21, 36, 57, 64]
>>> res3.get()
[0, 10, 40, 51, 68]
>>> res4.get()
[0, 28, 22, 39, 116]
Note, I did not use [h(x)]*len(y)
intentionally, as that would only call h
once; however it could also be called like this:
>>> def _f(m, g, x, y):
... return sum(m(g, x)) * y
...
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 26, 42, 57, 68]
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 20, 50, 78, 92]
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 12, 20, 51, 92]
You can get pathos
here: https://github.com/uqfoundation
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With