Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to schedule and cancel tasks with asyncio

I am writing a client-server application. While connected, client sends to the server a "heartbeat" signal, for example, every second. On the server-side I need a mechanism where I can add tasks (or coroutines or something else) to be executed asynchronously. Moreover, I want to cancel tasks from a client, when it stops sending that "heartbeat" signal.

In other words, when the server starts a task it has kind of timeout or ttl, in example 3 seconds. When the server receives the "heartbeat" signal it resets timer for another 3 seconds until task is done or client disconnected (stops send the signal).

Here is an example of canceling a task from asyncio tutorial on pymotw.com. But here the task is canceled before the event_loop started, which is not suitable for me.

import asyncio

async def task_func():
    print('in task_func')
    return 'the result'


event_loop = asyncio.get_event_loop()
try:
    print('creating task')
    task = event_loop.create_task(task_func())

    print('canceling task')
    task.cancel()

    print('entering event loop')
    event_loop.run_until_complete(task)
    print('task: {!r}'.format(task))
except asyncio.CancelledError:
    print('caught error from cancelled task')
else:
    print('task result: {!r}'.format(task.result()))
finally:
    event_loop.close()
like image 614
Sergey Belash Avatar asked Oct 13 '16 08:10

Sergey Belash


Video Answer


1 Answers

You can use asyncio Task wrappers to execute a task via the ensure_future() method.

ensure_future will automatically wrap your coroutine in a Task wrapper and attach it to your event loop. The Task wrapper will then also ensure that the coroutine 'cranks-over' from await to await statement (or until the coroutine finishes).

In other words, just pass a regular coroutine to ensure_future and assign the resultant Task object to a variable. You can then call Task.cancel() when you need to stop it.

import asyncio

async def task_func():
    print('in task_func')
    # if the task needs to run for a while you'll need an await statement
    # to provide a pause point so that other coroutines can run in the mean time
    await some_db_or_long_running_background_coroutine()
    # or if this is a once-off thing, then return the result,
    # but then you don't really need a Task wrapper...
    # return 'the result'

async def my_app():
    my_task = None
    while True:
        await asyncio.sleep(0)

        # listen for trigger / heartbeat
        if heartbeat and my_task is None:
            my_task = asyncio.ensure_future(task_func())

        # also listen for termination of hearbeat / connection
        elif not heartbeat and my_task:
            if not my_task.cancelled():
                my_task.cancel()
            else:
                my_task = None

run_app = asyncio.ensure_future(my_app())
event_loop = asyncio.get_event_loop()
event_loop.run_forever()

Note that tasks are meant for long-running tasks that need to keep working in the background without interrupting the main flow. If all you need is a quick once-off method, then just call the function directly instead.

like image 189
songololo Avatar answered Oct 02 '22 20:10

songololo