Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I schedule a task with asyncio to run in executor?

I have been using asyncio for concurrency, however I have run into a problem. I have the need to schedule a task with asyncio but it is blocking and so I would like to execute it in an executor using threadpool from concurrent.futures.

I have seen example like this, that schedule tasks:

now = loop.time()
loop.call_at(now + 60, callback, arg, loop)

and like this that run tasks in executors:

blocking_tasks = [
    loop.run_in_executor(executor, blocks)
    for i in range(6)
]
completed, pending = await asyncio.wait(blocking_tasks)

But how can I schedule a blocking task to run in executor?

like image 816
Brian Avatar asked Mar 07 '23 07:03

Brian


2 Answers

run_in_executor returns a future, so you can't use it with call_at, which requires an ordinary function. However, you can easily postpone execution using asyncio.sleep():

async def my_task():
    await asyncio.sleep(60)
    result = await loop.run_in_executor(None, fn)
    ...

taskobj = loop.create_task(my_task())

This has the advantage that the task created by create_task can be canceled during the sleep. Also, you can return a useful value from my_task() and obtain it using await taskobj, calling taskobj.result(), or loop.run_until_complete(taskobj).

like image 194
user4815162342 Avatar answered Mar 25 '23 09:03

user4815162342


You can create a wrapper like this for this.

def run_in_async_loop(f):
    @functools.wraps(f)
    async def wrapped(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return (await loop.run_in_executor(None, f(*args, **kwargs)))
    return wrapped
like image 39
binrebin Avatar answered Mar 25 '23 08:03

binrebin