I am trying to get timeouts to work in python3.2 using the concurrent.futures module. However when it does timeout, it doesn't really stop the execution. I tried with both threads and process pool executors neither of them stop the task, and only until its finished does a timeout become raised. So does anyone know if its possible to get this working?
import concurrent.futures import time import datetime max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] def run_loop(max_number): print("Started:", datetime.datetime.now(), max_number) last_number = 0; for i in range(1, max_number + 1): last_number = i * i return last_number def main(): with concurrent.futures.ProcessPoolExecutor(max_workers=len(max_numbers)) as executor: try: for future in concurrent.futures.as_completed(executor.map(run_loop, max_numbers, timeout=1), timeout=1): print(future.result(timeout=1)) except concurrent.futures._base.TimeoutError: print("This took to long...") if __name__ == '__main__': main()
ThreadPoolExecutor Methods : submit(fn, *args, **kwargs): It runs a callable or a method and returns a Future object representing the execution state of the method. map(fn, *iterables, timeout = None, chunksize = 1) : It maps the method and iterables together immediately and will raise an exception concurrent. futures.
The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .
While most every language has support for threads and locks, CPython remains special in its use of a global interpreter lock that prevents threads from concurrently accessing shared memory, because CPython's memory management is not thread-safe.
You can cancel tasks in the ProcessPoolExecutor by calling the cancel() function on the Future task.
As far as I can tell, TimeoutError is actually raised when you would expect it, and not after the task is finished.
However, your program itself will keep on running until all running tasks have been completed. This is because currently executing tasks (in your case, probably all your submitted tasks, as your pool size equals the number of tasks), are not actually "killed".
The TimeoutError is raised, so that you can choose not to wait until the task is finished (and do something else instead), but the task will keep on running until completed. And python will not exit as long as there are unfinished tasks in the threads/subprocesses of your Executor.
As far as I know, it is not possible to just "stop" currently executing Futures, you can only "cancel" scheduled tasks that have yet to be started. In your case, there won't be any, but imagine that you have pool of 5 threads/processes, and you want to process 100 items. At some point, there might be 20 completed tasks, 5 running tasks, and 75 tasks scheduled. In this case, you would be able to cancel those 76 scheduled tasks, but the 4 that are running will continue until completed, whether you wait for the result or not.
Even though it cannot be done that way, I guess there should be ways to achieve your desired end result. Maybe this version can help you on the way (not sure if it does exactly what you wanted, but it might be of some use):
import concurrent.futures import time import datetime max_numbers = [10000000, 10000000, 10000000, 10000000, 10000000] class Task: def __init__(self, max_number): self.max_number = max_number self.interrupt_requested = False def __call__(self): print("Started:", datetime.datetime.now(), self.max_number) last_number = 0; for i in xrange(1, self.max_number + 1): if self.interrupt_requested: print("Interrupted at", i) break last_number = i * i print("Reached the end") return last_number def interrupt(self): self.interrupt_requested = True def main(): with concurrent.futures.ThreadPoolExecutor(max_workers=len(max_numbers)) as executor: tasks = [Task(num) for num in max_numbers] for task, future in [(i, executor.submit(i)) for i in tasks]: try: print(future.result(timeout=1)) except concurrent.futures.TimeoutError: print("this took too long...") task.interrupt() if __name__ == '__main__': main()
By creating a callable object for each "task", and giving those to the executor instead of just a plain function, you can provide a way to "interrupt" the task. Tip: remove the task.interrupt()
line and see what happens, it may make it easier to understand my long explanation above ;-)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With