Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stopping processes in ThreadPool in Python

I've been trying to write an interactive wrapper (for use in ipython) for a library that controls some hardware. Some calls are heavy on the IO so it makes sense to carry out the tasks in parallel. Using a ThreadPool (almost) works nicely:

from multiprocessing.pool import ThreadPool

class hardware():
    def __init__(IPaddress):
        connect_to_hardware(IPaddress)

    def some_long_task_to_hardware(wtime):
        wait(wtime)
        result = 'blah'
        return result

pool = ThreadPool(processes=4)
Threads=[]
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)]
for tt in range(4):
    task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000))
    threads.append(task)
alive = [True]*4
Try:
    while any(alive) :
        for tt in range(4): alive[tt] = not threads[tt].ready()
        do_other_stuff_for_a_bit()
except:
    #some command I cannot find that will stop the threads...
    raise
for tt in range(4): print(threads[tt].get())

The problem comes if the user wants to stop the process or there is an IO error in do_other_stuff_for_a_bit(). Pressing Ctrl+C stops the main process but the worker threads carry on running until their current task is complete.
Is there some way to stop these threads without having to rewrite the library or have the user exit python? pool.terminate() and pool.join() that I have seen used in other examples do not seem to do the job.

The actual routine (instead of the simplified version above) uses logging and although all the worker threads are shut down at some point, I can see the processes that they started running carry on until complete (and being hardware I can see their effect by looking across the room).

This is in python 2.7.

UPDATE:

The solution seems to be to switch to using multiprocessing.Process instead of a thread pool. The test code I tried is to run foo_pulse:

class foo(object):
    def foo_pulse(self,nPulse,name): #just one method of *many*
        print('starting pulse for '+name)
        result=[]
        for ii in range(nPulse):
            print('on for '+name)
            time.sleep(2)
            print('off for '+name)
            time.sleep(2)
            result.append(ii)
        return result,name

If you try running this using ThreadPool then ctrl-C does not stop foo_pulse from running (even though it does kill the threads right away, the print statements keep on coming:

from multiprocessing.pool import ThreadPool
import time
def test(nPulse):
    a=foo()
    pool=ThreadPool(processes=4)
    threads=[]
    for rn in range(4) :
        r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn)))
        threads.append(r)
    alive=[True]*4
    try:
        while any(alive) : #wait until all threads complete
            for rn in range(4):
                alive[rn] = not threads[rn].ready() 
                time.sleep(1)
    except : #stop threads if user presses ctrl-c
        print('trying to stop threads')
        pool.terminate()
        print('stopped threads') # this line prints but output from foo_pulse carried on.
        raise
    else : 
        for t in threads : print(t.get())

However a version using multiprocessing.Process works as expected:

import multiprocessing as mp
import time
def test_pro(nPulse):
    pros=[]
    ans=[]
    a=foo()
    for rn in range(4) :
        q=mp.Queue()
        ans.append(q)
        r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))})
        r.start()
        pros.append(r)
    try:
        for p in pros : p.join()
        print('all done')
    except : #stop threads if user stops findRes
        print('trying to stop threads')
        for p in pros : p.terminate()
        print('stopped threads')
    else : 
        print('output here')
        for q in ans :
            print(q.get())
    print('exit time')

Where I have defined a wrapper for the library foo (so that it did not need to be re-written). If the return value is not needed the neither is this wrapper :

def wrapper(a,target,q,args=(),kwargs={}):
    '''Used when return value is wanted'''
    q.put(getattr(a,target)(*args,**kwargs))

From the documentation I see no reason why a pool would not work (other than a bug).

like image 303
SRD Avatar asked Aug 09 '16 17:08

SRD


People also ask

How do you stop Threadpool?

Call cancel() on the Future to Cancel a Task You can cancel tasks submitted to the ThreadPoolExecutor by calling the cancel() function on the Future object. Recall that you will receive a Future object when you submit your task to the thread pool by calling the submit() function.

How do you close a Threadpool in Python?

The ThreadPoolExecutor in Python provides a thread pool that lets you run tasks concurrently. You can add tasks to the pool by calling submit() with your function name, which will return a Future object. You can call the cancel() function on the Future object to cancel the task before it has started running.

How do I stop Qrunnable?

What you want to do is make your background task a QObject, give it a run() slot, and connect the thread. started to the worker. run. That's what will kick the process off.


1 Answers

This is a very interesting use of parallelism.

However, if you are using multiprocessing, the goal is to have many processes running in parallel, as opposed to one process running many threads.

Consider these few changes to implement it using multiprocessing:

You have these functions that will run in parallel:

import time
import multiprocessing as mp


def some_long_task_from_library(wtime):
    time.sleep(wtime)


class MyException(Exception): pass

def do_other_stuff_for_a_bit():
    time.sleep(5)
    raise MyException("Something Happened...")

Let's create and start the processes, say 4:

procs = []  # this is not a Pool, it is just a way to handle the
            # processes instead of calling them p1, p2, p3, p4...
for _ in range(4):
    p = mp.Process(target=some_long_task_from_library, args=(1000,))
    p.start()
    procs.append(p)
mp.active_children()   # this joins all the started processes, and runs them.

The processes are running in parallel, presumably in a separate cpu core, but that is to the OS to decide. You can check in your system monitor.

In the meantime you run a process that will break, and you want to stop the running processes, not leaving them orphan:

try:
    do_other_stuff_for_a_bit()
except MyException as exc:
    print(exc)
    print("Now stopping all processes...")
    for p in procs:
        p.terminate()
print("The rest of the process will continue")

If it doesn't make sense to continue with the main process when one or all of the subprocesses have terminated, you should handle the exit of the main program.

Hope it helps, and you can adapt bits of this for your library.

like image 96
chapelo Avatar answered Sep 22 '22 12:09

chapelo