Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Async for loop on AsyncGenerator

Having an asynchronous generator I would expect to be able to iterate through it asynchronously. However, I am missing something or messing something up or both as I end up with a regular synchronous for loop in the end:

import asyncio


async def time_consuming(t):
    print(f"Going to sleep for {t} seconds")
    await asyncio.sleep(t)
    print(f"Slept {t} seconds")
    return t


async def generator():
    for i in range(4, 0, -1):
        yield await time_consuming(i)


async def consumer():
    async for t in generator():
        print(f"Doing something with {t}")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.run_until_complete(consumer())
    loop.close()

This will take about 12 seconds to run and return this:

Going to sleep for 4 seconds
Slept 4 seconds
Doing something with 4
Going to sleep for 3 seconds
Slept 3 seconds
Doing something with 3
Going to sleep for 2 seconds
Slept 2 seconds
Doing something with 2
Going to sleep for 1 seconds
Slept 1 seconds
Doing something with 1

Though I expected it to take about 4 seconds to run and return something like this:

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 4 seconds
Doing something with 4
Slept 3 seconds
Doing something with 3
Slept 2 seconds
Doing something with 2
Slept 1 seconds
Doing something with 1
like image 296
Rodrigo Martins de Oliveira Avatar asked Oct 17 '18 13:10

Rodrigo Martins de Oliveira


People also ask

Can a for loop be async?

For loops. Combining async with a for (or a for...of ) loop is possibly the most straightforward option when performing asynchronous operations over array elements. Using await inside a for loop will cause the code to stop and wait for the asynchronous operation to complete before continuing.

How do I iterate through async generator?

The syntax is simple: prepend function* with async . That makes the generator asynchronous. And then use for await (...) to iterate over it, like this: async function* generateSequence(start, end) { for (let i = start; i <= end; i++) { // Wow, can use await!

Does async await block for loop?

Though it creates a confusion, in reality async and await will not block the JavaScript main thread. Like mentioned above they are just syntactic sugars for promise chaining. Putting other way both code snippets below are same.

Are generator functions asynchronous?

A generator function (ECMAScript 2015) in JavaScript is a special type of synchronous function which is able to stop and resume its execution at will.


1 Answers

An asynchronous generator does not mean that you execute iteration concurrently! All that you gain is more places for the coroutine to yield to other tasks. The iteration steps still run in series.

Put differently: an asynchronous iterator is useful for an iterator that needs to use I/O to obtain each iteration step. Think looping over the results of a web socket, or lines in a file. If each next() step over the iterator requires waiting for a slow I/O source to provide data, that's a good point to yield control to something else that has been set to run concurrently.

If you expected each individual step of your generator to be run concurrently, then you still would have to schedule additional tasks, explicitly, with the event loop.

You can then return from the generator when all those extra tasks have completed. If you scheduled your 4 time_consuming() coroutines as tasks, use asyncio.wait() to wait for one or all of the tasks to complete, and yield results from tasks that are done, then yes, after your for i in range(...): loop is complete, your process would only take 4 seconds in total:

async def generator():
    pending = []
    for i in range(4, 0, -1):
        pending.append(asyncio.create_task(time_consuming(i)))

    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            yield task.result()

at which point the output becomes

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 1 seconds
Doing something with 1
Slept 2 seconds
Doing something with 2
Slept 3 seconds
Doing something with 3
Slept 4 seconds
Doing something with 4

Note that this is the reverse order from your expected output, because this takes task results as they complete rather than wait for the first task to be created to complete. Usually this is what you want, really. Why wait for 4 seconds when you already have a result ready after 1?

You can have your variant too, of a sort, but you'd just code that up differently. Then you can just use asyncio.gather() on the 4 tasks, which schedules a bunch of coroutines to run as concurrent tasks, and return their results as a list, after which you can yield those results:

async def generator():
    tasks = []
    for i in range(4, 0, -1):
        tasks.append(time_consuming(i))

    for res in await asyncio.gather(*tasks):
        yield res 

but now the output becomes

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 1 seconds
Slept 2 seconds
Slept 3 seconds
Slept 4 seconds
Doing something with 4
Doing something with 3
Doing something with 2
Doing something with 1

because we can't do anything further until the longest task, time_consuming(4), has completed, yet the shorter-running tasks complete before that point and already output their Slept ... seconds message.

like image 133
Martijn Pieters Avatar answered Oct 22 '22 04:10

Martijn Pieters