Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Launching nested processes in multiprocessing

I have a main file that launches multiple processes and one of the processes again launches multiple processes. I am having problems launching the nested set of processes.

I have the following code in one file:

# parallel_test.py
import Queue
import multiprocessing
import time
import threading


def worker(q):
    while not q.empty():
        try:
            row = q.get(False)
            print row

            time.sleep(1)

        except Queue.Empty:
            break


def main():
    print 'creating queue'
    q = multiprocessing.Queue()

    print 'enqueuing'
    for i in range(100):
        q.put(i)

    num_processes = 15
    pool = []

    for i in range(num_processes):
        print 'launching process {0}'.format(i)
        p = multiprocessing.Process(target=worker, args=(q,))
        p.start()
        pool.append(p)

    for p in pool:
        p.join()

if __name__ == '__main__':
    main()

Running this file alone python parallel_test.py works fine and prints the numbers as expected. But launching it from another file as another Process causes problem. My main file:

# main_loop_test.py
import parallel_test
from multiprocessing import Pool
import time


def main():
    targets = [parallel_test.main]

    running = True

    while running:
        try:
            p = Pool(12)

            for target in targets:
                p.apply_async(target)

            p.close()  # For some reason you need to run close() before join()
            p.join()  # What for all the steps to be done

            print 'All steps done'

            time.sleep(2)

        except KeyboardInterrupt as e:
            print "<<<<<<<<<<<<<<<<<<CAUGHT KEYBOARD INTERRUPT FROM USER>>>>>>>>>>>>>>>>>>>"
            running = False


if __name__ == '__main__':
    main()

It parallel_test.py seems to try and launch one process (which does nothing) and then exits the function and main_loop_test.py prints 'All steps done'. No numbers are ever printed. Output:

creating queue
enqueuing
launching process 0
All steps done
creating queue
enqueuing
launching process 0
All steps done

What's going wrong? I get the same problem using Pool instead of managing the processes myself in parallel_test.py. Replacing multiprocessing with threading works though.

like image 667
mchangun Avatar asked Feb 13 '15 02:02

mchangun


2 Answers

You are not able to create child process from parallel_test when you invoke it as child process from another program for the reason that the process is getting created as daemonic process and as it is mentioned in the link https://docs.python.org/2/library/multiprocessing.html , daemonic process is not allowed to create child process. You have to create the process as non daemonic process by setting the daemon property of the process to be false as like below.

p = multiprocessing.Process(target=test.main)
p.daemon = False
p.start()
p.join()

I am not sure how to set the daemon property when you create child process through Pool module. You can try to pass this property through initializer list.

like image 74
vijayalakshmi d Avatar answered Oct 21 '22 08:10

vijayalakshmi d


Do you mean to use hierarchical parallelism through the multiprocessing module, like this example showing a blocking processing map being executed inside an asynchronous processing map?

>>> def squared(x):
...   return x**2
... 
>>> def triple(x):
...   return 3*x
... 
>>> from pathos.multiprocessing import ProcessingPool as PPool
>>> res = PPool().amap(triple, PPool().map(squared, xrange(10)))
>>> res.get()
[0, 3, 12, 27, 48, 75, 108, 147, 192, 243]

I'm using the pathos fork of multiprocessing, as it provides a bit easier use than that of the standard library version.

Also see another example here: https://stackoverflow.com/questions/28203774/how-to-do-hierarchical-parallelism-in-ipython-parallel

EDIT: The above question was deleted... so I'm adding the contents formerly at the link here: Here's how to do nested parallelism using two different types of pools.

>>> from pathos.multiprocessing import ProcessingPool, ThreadingPool
>>> # build a non-blocking processing pool map (i.e. async_map)
>>> amap = ProcessingPool().amap
>>> # build a blocking thread pool map
>>> tmap = ThreadingPool().map
>>>
>>> # define an 'inner' function 
>>> def g(x):
...   import random
...   return int(x * random.random())
... 
>>> # parallelize the inner function
>>> def h(x):
...   return sum(tmap(g, x))
... 
>>> # define the 'outer' function
>>> def f(x,y):
...   return x*y
... 
>>> # define two lists of different lengths
>>> x = range(10)
>>> y = range(5)
>>> 
>>> # evaluate in nested parallel (done several times, for effect)
>>> res1 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res2 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res3 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res4 = amap(f, [h(x),h(x),h(x),h(x),h(x)], y)
>>> res1.get()
[0, 15, 34, 57, 64]
>>> res2.get()
[0, 21, 36, 57, 64]
>>> res3.get()
[0, 10, 40, 51, 68]                       
>>> res4.get()
[0, 28, 22, 39, 116]

Note, I did not use [h(x)]*len(y) intentionally, as that would only call h once; however it could also be called like this:

>>> def _f(m, g, x, y):
...   return sum(m(g, x)) * y
... 
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 26, 42, 57, 68]
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 20, 50, 78, 92]
>>> amap(_f, [tmap]*len(y), [g]*len(y), [x]*len(y), y).get()
[0, 12, 20, 51, 92]

You can get pathos here: https://github.com/uqfoundation

like image 2
Mike McKerns Avatar answered Oct 21 '22 08:10

Mike McKerns