Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: concurrent.futures: cancel not possible [duplicate]

I would like to start a blocking function in an Executor using the asyncio call loop.run_in_executor and then cancel it later, but that doesn't seem to be working for me.

Here is the code:

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking_func(seconds_to_block):
    for i in range(seconds_to_block):
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


@asyncio.coroutine
def non_blocking_func(seconds):
    for i in range(seconds):
        print('yielding {}/{}'.format(i, seconds))
        yield from asyncio.sleep(1)

    print('done non blocking {}'.format(seconds))


@asyncio.coroutine
def main():
    non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
    blocking_future = loop.run_in_executor(None, blocking_func, 5)
    print('wait a few seconds!')
    yield from asyncio.sleep(1.5)

    blocking_future.cancel()
    yield from asyncio.wait(non_blocking_futures)



loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()

I would expect the code above to only allow the blocking function to output:

blocking 0/5
blocking 1/5

and then see the output of the non blocking function. But instead the blocking future continues on even after I have canceled.

Is it possible? Is there some other way of doing it?

Thanks

Edit: More discussion on running blocking and non-blocking code using asyncio: How to interface blocking and non-blocking code with asyncio

like image 338
Brendan Maguire Avatar asked Oct 16 '14 20:10

Brendan Maguire


People also ask

How does concurrent futures () work in Python?

The concurrent. futures module provides a high-level interface for asynchronously executing callables. The asynchronous execution can be performed with threads, using ThreadPoolExecutor , or separate processes, using ProcessPoolExecutor .

How do you exit futures in Python?

Call cancel() on the Future to Cancel a Task The Future object has a function called cancel() that will cancel the task if it has not yet started running and is not done. The cancel() function returns True if the task was canceled, otherwise False if the task could not be canceled.

Is ThreadPoolExecutor thread safe python?

ThreadPoolExecutor Thread-Safety Although the ThreadPoolExecutor uses threads internally, you do not need to work with threads directly in order to execute tasks and get results. Nevertheless, when accessing resources or critical sections, thread-safety may be a concern.

How do you stop a ProcessPoolExecutor?

The ProcessPoolExecutor in Python provides a process pool that lets you run tasks concurrently. You can add tasks to the pool by calling submit() with your function name, which will return a Future object. You can call the cancel() function on the Future object to cancel the task before it has started running.


2 Answers

In this case, there is no way to cancel the Future once it has actually started running, because you're relying on the behavior of concurrent.futures.Future, and its docs state the following:

cancel()

Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

So, the only time the cancellation would be successful is if the task is still pending inside of the Executor. Now, you're actually using an asyncio.Future wrapped around a concurrent.futures.Future, and in practice the asyncio.Future returned by loop.run_in_executor() will raise a CancellationError if you try to yield from it after you call cancel(), even if the underlying task is actually already running. But, it won't actually cancel the execution of the task inside the Executor.

If you need to actually cancel the task, you'll need to use a more conventional method of interrupting the task running in the thread. The specifics of how you do that is use-case dependent. For the use-case you presented in the example, you could use a threading.Event:

def blocking_func(seconds_to_block, event):
    for i in range(seconds_to_block):
        if event.is_set():
            return
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel()  # Mark Future as cancelled
event.set() # Actually interrupt blocking_func
like image 92
dano Avatar answered Oct 07 '22 20:10

dano


As threads share the same memory address space of a process, there is no safe way to terminate a running thread. This is the reason why most programming languages do not allow to kill running threads (there are lots of ugly hacks around this limitation).

Java learnt it the hard way.

A solution would consist in running your function in a separate process instead of a thread and terinate it gracefully.

The Pebble library offers an interface similar to concurrent.futures supporting running Futures to be cancelled.

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  
like image 24
noxdafox Avatar answered Oct 07 '22 18:10

noxdafox