Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scheduling an asyncio coroutine from another thread

I try to schedule an asyncio coroutine from another thread using create_task(). The problem is that the coroutine is not called, at least not in reasonable amount of time.

Is there are way to wake up the event loop or at least specify a shorter timeout?

#!/usr/bin/python3

import asyncio, threading

event_loop = None

@asyncio.coroutine
def coroutine():
    print("coroutine called")

def scheduler():
    print("scheduling...")
    event_loop.create_task(coroutine())
    threading.Timer(2, scheduler).start()

def main():
    global event_loop

    threading.Timer(2, scheduler).start()

    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)
    event_loop.run_forever()

main()

Output:

scheduling...
scheduling...
scheduling...
scheduling...
like image 414
kfx Avatar asked Jun 15 '16 16:06

kfx


2 Answers

According to the documentation of Task "this class is not thread safe". So scheduling from another thread is not expected to work.

I found two solutions for this based on the answers and comments here.

  1. @wind85 answer: directly replacing the create_task line call with asyncio.run_coroutine_threadsafe(coroutine(), event_loop) call. Requires Python 3.5.1.

  2. Use call_soon_threadsafe to schedule a callback, which then creates the task:

    def do_create_task():
        eventLoop.create_task(coroutine())
    
    def scheduler():
        eventLoop.call_soon_threadsafe(do_create_task)
    
like image 171
kfx Avatar answered Oct 28 '22 04:10

kfx


Here we go this shuold work. It's a port. Try it out since I have the latest version, I can't really assure you it will work.

#!/usr/bin/python3
import concurrent.futures 
import threading, asyncio
from asyncio import coroutines, futures

def run_coroutine_threadsafe_my(coro, loop):
    """Submit a coroutine object to a given event loop.
        Return a concurrent.futures.Future to access the result.
    """
    if not coroutines.iscoroutine(coro):
        raise TypeError('A coroutine object is required')
    future = concurrent.futures.Future()

    def callback():
        try:
            futures._chain_future(asyncio.ensure_future(coro, loop=loop), future)
        except Exception as exc:
            if future.set_running_or_notify_cancel():
                future.set_exception(exc)
            raise

    loop.call_soon_threadsafe(callback)
    return future




event_loop = None

@asyncio.coroutine
async def coro():
    print("coroutine called")

def scheduler():
    print("scheduling...")
    run_coroutine_threadsafe_my(coro(),event_loop)
    threading.Timer(2, scheduler).start()

def main():
    global event_loop

    threading.Timer(2, scheduler).start()

    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)
    event_loop.run_forever()

main()
like image 20
wind85 Avatar answered Oct 28 '22 04:10

wind85