Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python asyncio, how to create and cancel tasks from another thread

I have a python multi-threaded application. I want to run an asyncio loop in a thread and post calbacks and coroutines to it from another thread. Should be easy but I cannot get my head around the asyncio stuff.

I came up to the following solution which does half of what I want, feel free to comment on anything:

import asyncio from threading import Thread  class B(Thread):     def __init__(self):         Thread.__init__(self)         self.loop = None      def run(self):         self.loop = asyncio.new_event_loop()         asyncio.set_event_loop(self.loop) #why do I need that??         self.loop.run_forever()      def stop(self):         self.loop.call_soon_threadsafe(self.loop.stop)      def add_task(self, coro):         """this method should return a task object, that I           can cancel, not a handle"""         f = functools.partial(self.loop.create_task, coro)         return self.loop.call_soon_threadsafe(f)      def cancel_task(self, xx):         #no idea  @asyncio.coroutine def test():     while True:         print("running")         yield from asyncio.sleep(1)  b.start() time.sleep(1) #need to wait for loop to start t = b.add_task(test()) time.sleep(10) #here the program runs fine but how can I cancel the task?  b.stop() 

So starting and stoping the loop works fine. I thought about creating task using create_task, but that method is not threadsafe so I wrapped it in call_soon_threadsafe. But I would like to be able to get the task object in order to be able to cancel the task. I could do a complicated stuff using Future and Condition, but there must be a simplier way, isnt'it?

like image 975
Olivier RD Avatar asked Mar 27 '15 08:03

Olivier RD


People also ask

How do you cancel Asyncio tasks?

A Task is created and scheduled for its execution through the asyncio. create_task() function. Once scheduled, a Task can be requested for cancellation through task. cancel() method.

Does Asyncio use multiple threads?

When we utilize asyncio we create objects called coroutines. A coroutine can be thought of as executing a lightweight thread. Much like we can have multiple threads running at the same time, each with their own concurrent I/O operation, we can have many coroutines running alongside one another.

How do you stop a running event loop?

Run the event loop until stop() is called. If stop() is called before run_forever() is called, the loop will poll the I/O selector once with a timeout of zero, run all callbacks scheduled in response to I/O events (and those that were already scheduled), and then exit.


1 Answers

I think you may need to make your add_task method aware of whether or not its being called from a thread other than the event loop's. That way, if it's being called from the same thread, you can just call asyncio.async directly, otherwise, it can do some extra work to pass the task from the loop's thread to the calling thread. Here's an example:

import time import asyncio import functools from threading import Thread, current_thread, Event from concurrent.futures import Future  class B(Thread):     def __init__(self, start_event):         Thread.__init__(self)         self.loop = None         self.tid = None         self.event = start_event      def run(self):         self.loop = asyncio.new_event_loop()         asyncio.set_event_loop(self.loop)         self.tid = current_thread()         self.loop.call_soon(self.event.set)         self.loop.run_forever()      def stop(self):         self.loop.call_soon_threadsafe(self.loop.stop)      def add_task(self, coro):         """this method should return a task object, that I           can cancel, not a handle"""         def _async_add(func, fut):             try:                 ret = func()                 fut.set_result(ret)             except Exception as e:                 fut.set_exception(e)          f = functools.partial(asyncio.async, coro, loop=self.loop)         if current_thread() == self.tid:             return f() # We can call directly if we're not going between threads.         else:             # We're in a non-event loop thread so we use a Future             # to get the task from the event loop thread once             # it's ready.             fut = Future()             self.loop.call_soon_threadsafe(_async_add, f, fut)             return fut.result()      def cancel_task(self, task):         self.loop.call_soon_threadsafe(task.cancel)   @asyncio.coroutine def test():     while True:         print("running")         yield from asyncio.sleep(1)  event = Event() b = B(event) b.start() event.wait() # Let the loop's thread signal us, rather than sleeping t = b.add_task(test()) # This is a real task time.sleep(10) b.stop() 

First, we save the thread id of the event loop in the run method, so we can figure out if calls to add_task are coming from other threads later. If add_task is called from a non-event loop thread, we use call_soon_threadsafe to call a function that will both schedule the coroutine, and then use a concurrent.futures.Future to pass the task back to the calling thread, which waits on the result of the Future.

A note on cancelling a task: You when you call cancel on a Task, a CancelledError will be raised in the coroutine the next time the event loop runs. This means that the coroutine that the Task is wrapping will aborted due to the exception the next time it hit a yield point - unless the coroutine catches the CancelledError and prevents itself from aborting. Also note that this only works if the function being wrapped is actually an interruptible coroutine; an asyncio.Future returned by BaseEventLoop.run_in_executor, for example, can't really be cancelled, because it's actually wrapped around a concurrent.futures.Future, and those can't be cancelled once their underlying function actually starts executing. In those cases, the asyncio.Future will say its cancelled, but the function actually running in the executor will continue to run.

Edit: Updated the first example to use concurrent.futures.Future, instead of a queue.Queue, per Andrew Svetlov's suggestion.

Note: asyncio.async is deprecated since version 3.4.4 use asyncio.ensure_future instead.

like image 153
dano Avatar answered Sep 30 '22 07:09

dano