Python concurrent.futures and ProcessPoolExecutor provide a neat interface to schedule and monitor tasks. Futures even provide a .cancel() method:
cancel(): Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.
Unfortunately in a simmilar question (concerning asyncio) the answer claims running tasks are uncancelable using this snipped of the documentation, but the docs dont say that, only if they are running AND uncancelable.
Submitting multiprocessing.Events to the processes is also not trivially possible (doing so via parameters as in multiprocess.Process returns a RuntimeError)
What am I trying to do? I would like to partition a search space and run a task for every partition. But it is enough to have ONE solution and the process is CPU intensive. So is there an actual comfortable way to accomplish this that does not offset the gains by using ProcessPool to begin with?
Example:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait # function that profits from partitioned search space def m_run(partition): for elem in partition: if elem == 135135515: return elem return False futures = [] # used to create the partitions steps = 100000000 with ProcessPoolExecutor(max_workers=4) as pool: for i in range(4): # run 4 tasks with a partition, but only *one* solution is needed partition = range(i*steps,(i+1)*steps) futures.append(pool.submit(m_run, partition)) done, not_done = wait(futures, return_when=FIRST_COMPLETED) for d in done: print(d.result()) print("---") for d in not_done: # will return false for Cancel and Result for all futures print("Cancel: "+str(d.cancel())) print("Result: "+str(d.result()))
Call cancel() on the Future to Cancel a Task You can cancel tasks submitted to the ThreadPoolExecutor by calling the cancel() function on the Future object. Recall that you will receive a Future object when you submit your task to the thread pool by calling the submit() function.
The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .
ThreadPoolExecutor Methods : submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method. map(fn, *iterables, timeout = None, chunksize = 1) : It maps the method and iterables together immediately and will raise an exception concurrent. futures.
ThreadPoolExecutor Thread-Safety Although the ThreadPoolExecutor uses threads internally, you do not need to work with threads directly in order to execute tasks and get results. Nevertheless, when accessing resources or critical sections, thread-safety may be a concern.
I don't know why concurrent.futures.Future
does not have a .kill()
method, but you can accomplish what you want by shutting down the process pool with pool.shutdown(wait=False)
, and killing the remaining child processes by hand.
Create a function for killing child processes:
import signal, psutil def kill_child_processes(parent_pid, sig=signal.SIGTERM): try: parent = psutil.Process(parent_pid) except psutil.NoSuchProcess: return children = parent.children(recursive=True) for process in children: process.send_signal(sig)
Run your code until you get the first result, then kill all remaining child processes:
from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait # function that profits from partitioned search space def m_run(partition): for elem in partition: if elem == 135135515: return elem return False futures = [] # used to create the partitions steps = 100000000 pool = ProcessPoolExecutor(max_workers=4) for i in range(4): # run 4 tasks with a partition, but only *one* solution is needed partition = range(i*steps,(i+1)*steps) futures.append(pool.submit(m_run, partition)) done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED) # Shut down pool pool.shutdown(wait=False) # Kill remaining child processes kill_child_processes(os.getpid())
Unfortunately, running Futures
cannot be cancelled. I believe the core reason is to ensure the same API over different implementations (it's not possible to interrupt running threads or coroutines).
The Pebble library was designed to overcome this and other limitations.
from pebble import ProcessPool def function(foo, bar=0): return foo + bar with ProcessPool() as pool: future = pool.schedule(function, args=[1]) # if running, the container process will be terminated # a new process will be started consuming the next task future.cancel()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With