Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Retry task after task exception with asyncio.wait

I have multiple coroutines that should be run simultaneously, some of which may throw an exception. In those cases the coroutines should be run again. How do I accomplish this? Minimum demo of what I'm trying to do:

import asyncio
import time

t = time.time()


async def c1():
    print("finished c1 {}".format(time.time() - t))


async def c2():
    await asyncio.sleep(3)
    print("finished c2 {}".format(time.time() - t))


called = False


async def c3():
    global called
    # raises an exception the first time it's called
    if not called:
        called = True
        raise RuntimeError("c3 called the first time")
    print("finished c3 {}".format(time.time() - t))


async def run():
    pending = {c1(), c2(), c3()}

    num_times_called = 0
    while pending:
        num_times_called += 1
        print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))

        finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
        for task in finished:
            if task.exception():
                print("{} got an exception {}, retrying".format(task, task.exception()))
                pending.add(task)

        print("finished {}".format(finished))

    print("finished all {}".format(time.time() - t))


asyncio.get_event_loop().run_until_complete(run())

c3() represents that some coroutines will fail and need to be rerun. The problem with the demo is the finished task is finished and has exception set, so when I put it back onto the pending set, the next run loop exits immediately without rerunning c3() because it's already done.

Is there a way to clear the task so that it will run c3() again? I know that the coroutine instance attached to the task cannot be awaited on again else I get

RuntimeError('cannot reuse already awaited coroutine',)

which means I have to manually manage a map from coroutine instance to the coroutine that generated it, then retrieve the failed coroutine instance with task._coro - is this right?

like image 865
LemonPi Avatar asked Aug 03 '18 20:08

LemonPi


1 Answers

EDIT: the task itself could be the key in the map, which is cleaner.

async def run():
    tasks = {asyncio.ensure_future(c()): c for c in (c1, c2, c3)}
    pending = set(tasks.keys())

    num_times_called = 0
    while pending:
        num_times_called += 1
        print("{} times called with {} pending tasks: {}".format(num_times_called, len(pending), pending))

        finished, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
        for task in finished:
            if task.exception():
                print("{} got an exception {}, retrying".format(task, task.exception()))
                coro = tasks[task]
                new_task = asyncio.ensure_future(coro())
                tasks[new_task] = coro
                pending.add(new_task)

        print("finished {}".format(finished))

    print("finished all {}".format(time.time() - t))
like image 88
LemonPi Avatar answered Sep 22 '22 06:09

LemonPi