Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: concurrent.futures How to make it cancelable?

Tags:

Python concurrent.futures and ProcessPoolExecutor provide a neat interface to schedule and monitor tasks. Futures even provide a .cancel() method:

cancel(): Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

Unfortunately in a simmilar question (concerning asyncio) the answer claims running tasks are uncancelable using this snipped of the documentation, but the docs dont say that, only if they are running AND uncancelable.

Submitting multiprocessing.Events to the processes is also not trivially possible (doing so via parameters as in multiprocess.Process returns a RuntimeError)

What am I trying to do? I would like to partition a search space and run a task for every partition. But it is enough to have ONE solution and the process is CPU intensive. So is there an actual comfortable way to accomplish this that does not offset the gains by using ProcessPool to begin with?

Example:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait  # function that profits from partitioned search space def m_run(partition):     for elem in partition:         if elem == 135135515:             return elem     return False  futures = [] # used to create the partitions steps = 100000000 with ProcessPoolExecutor(max_workers=4) as pool:     for i in range(4):         # run 4 tasks with a partition, but only *one* solution is needed         partition = range(i*steps,(i+1)*steps)         futures.append(pool.submit(m_run, partition))      done, not_done = wait(futures, return_when=FIRST_COMPLETED)     for d in done:         print(d.result())      print("---")     for d in not_done:         # will return false for Cancel and Result for all futures         print("Cancel: "+str(d.cancel()))         print("Result: "+str(d.result())) 
like image 502
Ketzu Avatar asked Mar 14 '17 10:03

Ketzu


People also ask

How do you prevent ThreadPoolExecutor?

Call cancel() on the Future to Cancel a Task You can cancel tasks submitted to the ThreadPoolExecutor by calling the cancel() function on the Future object. Recall that you will receive a Future object when you submit your task to the thread pool by calling the submit() function.

What is Python concurrent futures?

The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .

How does ThreadPoolExecutor work in Python?

ThreadPoolExecutor Methods : submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method. map(fn, *iterables, timeout = None, chunksize = 1) : It maps the method and iterables together immediately and will raise an exception concurrent. futures.

Is ThreadPoolExecutor thread-safe Python?

ThreadPoolExecutor Thread-Safety Although the ThreadPoolExecutor uses threads internally, you do not need to work with threads directly in order to execute tasks and get results. Nevertheless, when accessing resources or critical sections, thread-safety may be a concern.


2 Answers

I don't know why concurrent.futures.Future does not have a .kill() method, but you can accomplish what you want by shutting down the process pool with pool.shutdown(wait=False), and killing the remaining child processes by hand.

Create a function for killing child processes:

import signal, psutil  def kill_child_processes(parent_pid, sig=signal.SIGTERM):     try:         parent = psutil.Process(parent_pid)     except psutil.NoSuchProcess:         return     children = parent.children(recursive=True)     for process in children:         process.send_signal(sig) 

Run your code until you get the first result, then kill all remaining child processes:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait  # function that profits from partitioned search space def m_run(partition):     for elem in partition:         if elem == 135135515:             return elem     return False  futures = [] # used to create the partitions steps = 100000000 pool = ProcessPoolExecutor(max_workers=4) for i in range(4):     # run 4 tasks with a partition, but only *one* solution is needed     partition = range(i*steps,(i+1)*steps)     futures.append(pool.submit(m_run, partition))  done, not_done = wait(futures, timeout=3600, return_when=FIRST_COMPLETED)  # Shut down pool pool.shutdown(wait=False)  # Kill remaining child processes kill_child_processes(os.getpid()) 
like image 173
ostrokach Avatar answered Sep 19 '22 14:09

ostrokach


Unfortunately, running Futures cannot be cancelled. I believe the core reason is to ensure the same API over different implementations (it's not possible to interrupt running threads or coroutines).

The Pebble library was designed to overcome this and other limitations.

from pebble import ProcessPool  def function(foo, bar=0):     return foo + bar  with ProcessPool() as pool:     future = pool.schedule(function, args=[1])      # if running, the container process will be terminated      # a new process will be started consuming the next task     future.cancel()   
like image 29
noxdafox Avatar answered Sep 19 '22 14:09

noxdafox