I try to schedule an asyncio coroutine from another thread using create_task()
. The problem is that the coroutine is not called, at least not in reasonable amount of time.
Is there are way to wake up the event loop or at least specify a shorter timeout?
#!/usr/bin/python3
import asyncio, threading
event_loop = None
@asyncio.coroutine
def coroutine():
print("coroutine called")
def scheduler():
print("scheduling...")
event_loop.create_task(coroutine())
threading.Timer(2, scheduler).start()
def main():
global event_loop
threading.Timer(2, scheduler).start()
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
event_loop.run_forever()
main()
Output:
scheduling...
scheduling...
scheduling...
scheduling...
According to the documentation of Task "this class is not thread safe". So scheduling from another thread is not expected to work.
I found two solutions for this based on the answers and comments here.
@wind85 answer: directly replacing the create_task
line call with asyncio.run_coroutine_threadsafe(coroutine(), event_loop)
call. Requires Python 3.5.1.
Use call_soon_threadsafe
to schedule a callback, which then creates the task:
def do_create_task():
eventLoop.create_task(coroutine())
def scheduler():
eventLoop.call_soon_threadsafe(do_create_task)
Here we go this shuold work. It's a port. Try it out since I have the latest version, I can't really assure you it will work.
#!/usr/bin/python3
import concurrent.futures
import threading, asyncio
from asyncio import coroutines, futures
def run_coroutine_threadsafe_my(coro, loop):
"""Submit a coroutine object to a given event loop.
Return a concurrent.futures.Future to access the result.
"""
if not coroutines.iscoroutine(coro):
raise TypeError('A coroutine object is required')
future = concurrent.futures.Future()
def callback():
try:
futures._chain_future(asyncio.ensure_future(coro, loop=loop), future)
except Exception as exc:
if future.set_running_or_notify_cancel():
future.set_exception(exc)
raise
loop.call_soon_threadsafe(callback)
return future
event_loop = None
@asyncio.coroutine
async def coro():
print("coroutine called")
def scheduler():
print("scheduling...")
run_coroutine_threadsafe_my(coro(),event_loop)
threading.Timer(2, scheduler).start()
def main():
global event_loop
threading.Timer(2, scheduler).start()
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
event_loop.run_forever()
main()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With