Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use concurrent.futures with timeouts?

I am trying to get timeouts to work in python3.2 using the concurrent.futures module. However when it does timeout, it doesn't really stop the execution. I tried with both threads and process pool executors neither of them stop the task, and only until its finished does a timeout become raised. So does anyone know if its possible to get this working?

import concurrent.futures import time import datetime  max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]  def run_loop(max_number):     print("Started:", datetime.datetime.now(), max_number)     last_number = 0;     for i in range(1, max_number + 1):         last_number = i * i     return last_number  def main():     with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor:         try:             for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1):                 print(future.result(timeout=1))         except concurrent.futures._base.TimeoutError:             print("This took to long...")  if __name__ == '__main__':     main() 
like image 358
Joseph Montanez Avatar asked Jun 28 '11 15:06

Joseph Montanez


People also ask

How does concurrent futures ThreadPoolExecutor work?

ThreadPoolExecutor Methods : submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method. map(fn, *iterables, timeout = None, chunksize = 1) : It maps the method and iterables together immediately and will raise an exception concurrent. futures.

What are concurrent futures?

The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .

Is concurrent futures thread-safe?

While most every language has support for threads and locks, CPython remains special in its use of a global interpreter lock that prevents threads from concurrently accessing shared memory, because CPython's memory management is not thread-safe.

How do you stop a ProcessPoolExecutor?

You can cancel tasks in the ProcessPoolExecutor by calling the cancel() function on the Future task.


1 Answers

As far as I can tell, TimeoutError is actually raised when you would expect it, and not after the task is finished.

However, your program itself will keep on running until all running tasks have been completed. This is because currently executing tasks (in your case, probably all your submitted tasks, as your pool size equals the number of tasks), are not actually "killed".

The TimeoutError is raised, so that you can choose not to wait until the task is finished (and do something else instead), but the task will keep on running until completed. And python will not exit as long as there are unfinished tasks in the threads/subprocesses of your Executor.

As far as I know, it is not possible to just "stop" currently executing Futures, you can only "cancel" scheduled tasks that have yet to be started. In your case, there won't be any, but imagine that you have pool of 5 threads/processes, and you want to process 100 items. At some point, there might be 20 completed tasks, 5 running tasks, and 75 tasks scheduled. In this case, you would be able to cancel those 76 scheduled tasks, but the 4 that are running will continue until completed, whether you wait for the result or not.

Even though it cannot be done that way, I guess there should be ways to achieve your desired end result. Maybe this version can help you on the way (not sure if it does exactly what you wanted, but it might be of some use):

import concurrent.futures import time import datetime  max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000]  class Task:     def __init__(self, max_number):         self.max_number = max_number         self.interrupt_requested = False      def __call__(self):         print("Started:", datetime.datetime.now(), self.max_number)         last_number = 0;         for i in xrange(1, self.max_number + 1):             if self.interrupt_requested:                 print("Interrupted at", i)                 break             last_number = i * i         print("Reached the end")         return last_number      def interrupt(self):         self.interrupt_requested = True  def main():     with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor:         tasks = [Task(num) for num in max_numbers]         for task, future in [(i, executor.submit(i)) for i in tasks]:             try:                 print(future.result(timeout=1))             except concurrent.futures.TimeoutError:                 print("this took too long...")                 task.interrupt()   if __name__ == '__main__':     main() 

By creating a callable object for each "task", and giving those to the executor instead of just a plain function, you can provide a way to "interrupt" the task. Tip: remove the task.interrupt() line and see what happens, it may make it easier to understand my long explanation above ;-)

like image 182
Steven Avatar answered Oct 01 '22 14:10

Steven