Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

asyncio: speculatively await multiple futures

TL;DR

How can I await any future from a collection of futures, optionally notifying other futures "they are no longer needed"?


Here's why I need this. I want to create a special kind of future / task that does timekeeping and may be used with other futures / tasks to cancel them if they cumulatively exceed some timeout (or are forced to stop after interacting with this timekeeping task). If you are familiar with Go, it has a similar concept called Context.

To make this even more concrete, imagine this. You have a typical HTTP client. It needs to perform several possibly forever blocking operations in succession in order to request a page from URL. For example, these operations could be:

  1. Allocate a socket.
  2. Connect to server.
  3. Retrieve page in several chunks.
  4. Close connection.
  5. Deallocate socket.

Suppose you allow for the entire operation to take a minute. But you also know that allocating a socket should not take more than a millisecond, connecting may take up to a minute as well, same for retrieving chunks. Disconnection and resource deallocation should take milliseconds.

Suppose now you are made to wait full timeout on each bullet point--well, you've exceeded your quota more than twice. So, you need to pass calculated delta from each call to its successor. Also, suppose you couldn't deallocate the socket--well, no big deal, the application may recover from this error, so you also need to distinguish between kinds of timeouts. I imagine this could be written like so (in some imaginary version of Python):

async def http_request(context, url):
    socket = await min(allocate_socket(), context.timeout, socket_timeout)
    await min(socket.connect(), context.timeout, connect_timeout)
    async for chunk in min(socket.receive(), context.timeout, chunk_timeout):
        print(chunk)
    await min(socket.close(), context.timeout, close_timeout)
like image 629
wvxvw Avatar asked Oct 29 '22 00:10

wvxvw


1 Answers

async_timeout is exactly what you need, your code will look like:

from async_timeout import timeout


async def http_request(url):
    async with timeout(timeout_for_all):

        async with timeout(socket_timeout):
            socket = await allocate_socket()

        async with timeout(connect_timeout):
            await socket.connect()

        async with timeout(chunk_timeout):
            async for chunk in socket.receive():
                print(chunk)

        async with timeout(close_timeout):
            await socket.close()

Let's examine the issues you named.

Go's style Context can also have cancel() method which allows canceling the process from the outside regardless of time spent waiting.

asyncio has a way to cancel any running task regardless of timeout or anything else. You should call cancel() method on some task (asyncio.CancelledError would be raised inside it) and await task propagated it (probably suppressing exception):

task.cancel()
with suppress(asyncio.CancelledError):
    await task

It's a standard way to cancel thing before they done. You don't need anything more complex then that.

Also it could expire based on either wall clock or the internal timer.

I'm not sure I understood this, but async_timeout gives you exactly what you want - a way to limit task execution with some concrete time.

Also, I'm afraid that if this is not implemented directly in asyncio as a separate thread, then it will have to wait for the scheduled / blocking coroutine to finish regardless of the timeout (it will only be able to cancel the execution if the executed coroutine goes to sleep).

asyncio module itself in some sense is created to avoid using multiple threads. Ideally your async program should use many coroutines managed by single event loop inside single thread.

This common event loop manages things to happen at the time they should happen. This code will raise TimeoutError after 1 second of running:

async with timeout(1):
    await asyncio.sleep(20)

Upd:

A different example would be when I need to wait for multiple workers to finish a certain task, when I only care about one of them completing it, but I don't care about timeout at all.

It can be done with standard asyncio functionality also:

# Start 3 concurrent tasks (workers):
task_1 = asyncio.ensure_future(coro())
task_2 = asyncio.ensure_future(coro())
task_3 = asyncio.ensure_future(coro())

# Wait first of them done:
tasks = (task_1, task_2, task_3,)
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print('done', done.pop().result())

# Cancel others since they're still running, 
# but we don't need them to be finished:
for task in pending:
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task
like image 114
Mikhail Gerasimov Avatar answered Nov 11 '22 14:11

Mikhail Gerasimov