Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Terminate a Python multiprocessing program once a one of its workers meets a certain condition

I am writing a Python program using its multiprocessing module. The program calls a number of worker functions, each yielding a random number. I need to terminate the program once one of the workers has produced a number larger than 0.7.

Below is my program where the "how to do this" part is not yet filled out. Any idea? Thanks.

import time
import numpy as np
import multiprocessing as mp
import time
import sys

def f(i):
    np.random.seed(int(time.time()+i))

    time.sleep(3)
    res=np.random.rand()
    print "From i = ",i, "       res = ",res
    if res>0.7:
        print "find it"
        # terminate  ???? Question: How to do this???


if __name__=='__main__':
    num_workers=mp.cpu_count()
    pool=mp.Pool(num_workers)
    for i in range(num_workers):
        p=mp.Process(target=f,args=(i,))
        p.start()
like image 221
zell Avatar asked May 01 '16 02:05

zell


People also ask

How do you stop a multiprocessing process in Python?

Terminating processes in Python We can kill or terminate a process immediately by using the terminate() method. We will use this method to terminate the child process, which has been created with the help of function, immediately before completing its execution.

How do you stop a multiprocessing process?

A process can be killed by calling the Process. kill() function. The call will only terminate the target process, not child processes. The method is called on the multiprocessing.

What is lock in multiprocessing in Python?

Python provides a mutual exclusion lock for use with processes via the multiprocessing. Lock class. An instance of the lock can be created and then acquired by processes before accessing a critical section, and released after the critical section.

How do I close a multiprocessing queue?

Close a Python Multiprocessing Queue If you want no process should write into a multiprocessing queue, you can close the queue using the close() method. The close() method, when invoked on a multiprocessing queue in any of the processes, closes the queue.


2 Answers

No process can stop another short of brute force os.kill()-like sledgehammers. Don't go there.

To do this sanely, you need to rework your basic approach: the main process and the worker processes need to communicate with each other.

I'd flesh it out, but the example so far is too bare-bones to make it useful. For example, as written, no more than num_workers calls to rand() are ever made, so there's no reason to believe any of them must be > 0.7.

Once the worker function grows a loop, then it becomes more obvious. For example, the worker could check to see if an mp.Event is set at the top of the loop, and just exit if it is. The main process would set the Event when it wants the workers to stop.

And a worker could set a different mp.Event when it found a value > 0.7. The main process would wait for that Event, then set the "time to stop" Event for workers to see, then do the usual loop .join()-ing the workers for a clean shutdown.

EDIT

Here's fleshing out a portable, clean solution, assuming the workers are going to keep going until at least one finds a value > 0.7. Note that I removed numpy from this, because it's irrelevant to this code. The code here should work fine under any stock Python on any platform supporting multiprocessing:

import random
from time import sleep

def worker(i, quit, foundit):
    print "%d started" % i
    while not quit.is_set():
        x = random.random()
        if x > 0.7:
            print '%d found %g' % (i, x)
            foundit.set()
            break
        sleep(0.1)
    print "%d is done" % i

if __name__ == "__main__":
    import multiprocessing as mp
    quit = mp.Event()
    foundit = mp.Event()
    for i in range(mp.cpu_count()):
        p = mp.Process(target=worker, args=(i, quit, foundit))
        p.start()
    foundit.wait()
    quit.set()

And some sample output:

0 started
1 started
2 started
2 found 0.922803
2 is done
3 started
3 is done
4 started
4 is done
5 started
5 is done
6 started
6 is done
7 started
7 is done
0 is done
1 is done

Everything shuts down cleanly: no tracebacks, no abnormal terminations, no zombie processes left behind ... clean as a whistle.

KILLING IT

As @noxdafox pointed at, there's a Pool.terminate() method that does the best it can, across platforms, to kill worker processes no matter what they're doing (e.g., on Windows it calls the platform TerminateProcess()). I don't recommend it for production code, because killing a process abruptly can leave various shared resources in inconsistent states, or let them leak. There are various warnings about that in the multiprocessing docs, to which you should add your OS docs.

Still, it can be expedient! Here's a full program using this approach. Note that I bumped the cutoff to 0.95, to make this more likely to take longer than an eyeblink to run:

import random
from time import sleep

def worker(i):
    print "%d started" % i
    while True:
        x = random.random()
        print '%d found %g' % (i, x)
        if x > 0.95:
            return x # triggers callback
        sleep(0.5)

# callback running only in __main__
def quit(arg):
    print "quitting with %g" % arg
    # note: p is visible because it's global in __main__
    p.terminate()  # kill all pool workers

if __name__ == "__main__":
    import multiprocessing as mp
    ncpu = mp.cpu_count()
    p = mp.Pool(ncpu)
    for i in range(ncpu):
        p.apply_async(worker, args=(i,), callback=quit)
    p.close()
    p.join()

And some sample output:

$ python mptest.py
0 started
0 found 0.391351
1 started
1 found 0.767374
2 started
2 found 0.110969
3 started
3 found 0.611442
4 started
4 found 0.790782
5 started
5 found 0.554611
6 started
6 found 0.0483844
7 started
7 found 0.862496
0 found 0.27175
1 found 0.0398836
2 found 0.884015
3 found 0.988702
quitting with 0.988702
4 found 0.909178
5 found 0.336805
6 found 0.961192
7 found 0.912875
$ [the program ended]
like image 180
Tim Peters Avatar answered Sep 24 '22 18:09

Tim Peters


There is a much cleaner and pythonic way to do what you want to do and it's achieved by using the callback functions offered by multiprocessing.Pool.

You can check this question to see an implementation example.

like image 30
noxdafox Avatar answered Sep 25 '22 18:09

noxdafox