Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Start async task now, await later

C# programmer trying to learn some Python. I am trying to run a CPU intensive calc while letting an IO bound async method quietly chug away in the background. In C#, I would typically set the awaitable going, then kick off the CPU intensive code, then await the IO task, then combine results.

Here's how I'd do it in C#

static async Task DoStuff() {
    var ioBoundTask = DoIoBoundWorkAsync();
    int cpuBoundResult = DoCpuIntensizeCalc();
    int ioBoundResult = await ioBoundTask.ConfigureAwait(false);

    Console.WriteLine($"The result is {cpuBoundResult + ioBoundResult}");
}

static async Task<int> DoIoBoundWorkAsync() {
    Console.WriteLine("Make API call...");
    await Task.Delay(2500).ConfigureAwait(false); // non-blocking async call
    Console.WriteLine("Data back.");
    return 1;
}

static int DoCpuIntensizeCalc() {
    Console.WriteLine("Do smart calc...");
    Thread.Sleep(2000);  // blocking call. e.g. a spinning loop
    Console.WriteLine("Calc finished.");
    return 2;
}

And here's the equivalent code in python

import time
import asyncio

async def do_stuff():
    ioBoundTask = do_iobound_work_async()
    cpuBoundResult = do_cpu_intensive_calc()
    ioBoundResult = await ioBoundTask
    print(f"The result is {cpuBoundResult + ioBoundResult}")

async def do_iobound_work_async(): 
    print("Make API call...")
    await asyncio.sleep(2.5)  # non-blocking async call
    print("Data back.")
    return 1

def do_cpu_intensive_calc():
    print("Do smart calc...")
    time.sleep(2)  # blocking call. e.g. a spinning loop
    print("Calc finished.")
    return 2

await do_stuff()

Importantly, please note that the CPU intensive task is represented by a blocking sleep that cannot be awaited and the IO bound task is represented by a non-blocking sleep that is awaitable.

This takes 2.5 seconds to run in C# and 4.5 seconds in Python. The difference is that C# runs the asynchronous method straight away whereas python only starts the method when it hits the await. Output below confirms this. How can I achieve the desired result. Code that would work in Jupyter Notebook would be appreciated if at all possible.

--- C# ---
Make API call...
Do smart calc...
Calc finished.
Data back.
The result is 3
--- Python ---
Do smart calc...
Calc finished.
Make API call...
Data back.
The result is 3

Update 1

Inspired by knh190's answer, it seems that I can get most of the way there using asyncio.create_task(...). This achieves the desired result (2.5 secs): first, the asynchronous code is set running; next, the blocking CPU code is run synchronously; third the asynchronous code is awaited; finally the results are combined. To get the asynchronous call to actually start running, I had to put an await asyncio.sleep(0) in, which feels like a horrible hack. Can we set the task running without doing this? There must be a better way...

async def do_stuff():
    task = asyncio.create_task(do_iobound_work_async())
    await asyncio.sleep(0)  #   <~~~~~~~~~ This hacky line sets the task running

    cpuBoundResult = do_cpu_intensive_calc()
    ioBoundResult = await task

    print(f"The result is {cpuBoundResult + ioBoundResult}")
like image 888
Big AL Avatar asked May 21 '19 19:05

Big AL


3 Answers

So with a bit more research it seems that this is possible but not quite as easy as in C#. The code for do_stuff() becomes:

async def do_stuff():
    task = asyncio.create_task(do_iobound_work_async())  # add task to event loop
    await asyncio.sleep(0)                               # return control to loop so task can start
    cpuBoundResult = do_cpu_intensive_calc()             # run blocking code synchronously
    ioBoundResult = await task                           # at last, we can await our async code

    print(f"The result is {cpuBoundResult + ioBoundResult}")

Versus C#, the two differences are:

  1. asyncio.create_task(...) required to add the task to the running event loop
  2. await asyncio.sleep(0) to temporarily return control back to the event loop so it can start the task.

The complete code sample is now:

import time
import asyncio

async def do_stuff():
    task = asyncio.create_task(do_iobound_work_async())  # add task to event loop
    await asyncio.sleep(0)                               # return control to loop so task can start
    cpuBoundResult = do_cpu_intensive_calc()             # run blocking code synchronously
    ioBoundResult = await task                           # at last, we can await our async code

    print(f"The result is {cpuBoundResult + ioBoundResult}")

async def do_iobound_work_async(): 
    print("Make API call...")
    await asyncio.sleep(2.5)  # non-blocking async call. Hence the use of asyncio
    print("Data back.")
    return 1

def do_cpu_intensive_calc():
    print("Do smart calc...")
    time.sleep(2)  # long blocking code that cannot be awaited. e.g. a spinning loop
    print("Calc finished.")
    return 2

await do_stuff()

I am not a big fan of having to remember to add that extra await asyncio.sleep(0) in order to start the task. It might be neater to have an awaitable function like begin_task(...) that starts the task running automatically such that it can be awaited at a later stage. Something like the below, for instance:

async def begin_task(coro):
    """Awaitable function that adds a coroutine to the event loop and sets it running."""
    task = asyncio.create_task(coro)
    await asyncio.sleep(0)
    return task

async def do_stuff():
    io_task = await begin_task(do_iobound_work_async())
    cpuBoundResult = do_cpu_intensive_calc()
    ioBoundResult = await io_task
    print(f"The result is {cpuBoundResult + ioBoundResult}")
like image 186
Big AL Avatar answered Oct 21 '22 13:10

Big AL


Unfortunatly, there is no reliable way to do this. When I wrote the "async" helpers for Stackless Python's Stacklesslib, I had this C# behaviour in mind. Because when I first encountered the async keywoard there, it was a brilliant way to perform depth first execution until such time when execution blocks, at which time execution continues at where an async function was invoked last.

The reason this is clever is that a program can schedule the start of blocking transactions (such as http requests) to happen at the earliest possible moment, reducing latency. When you start a method which you know will block, waiting for a reply, you want it to start executing immediately to improve the responsiveness of your program, then await the result when you actually need it.

The examples above work, but fail when there are more than one level of invocation. async.sleep(0) does not guarantee anything:

import asyncio


async def startit(thing):
    t = asyncio.create_task(thing)
    # what we really need to do here is:
    # Insert t into runnable queue, just before asyncio.current_task(), and switch to it.
    # Only, it is not possible since event loops are just about scheduling callbacks
    await asyncio.sleep(0)
    return t


async def fa():
    print('fa start')
    gb = await startit(fb())
    # send off a hTTP request and wait for it
    print ('fa doing blocking thing')
    await asyncio.sleep(0.1)
    print ('fa waiting for gb')
    await gb
    print ('fa stopping')
    return 'a'

async def fb():
    print('fb start')
    # send off another http request and wait for it
    await asyncio.sleep(0.1)
    print('fb stop')
    return 'b'


async def main():
    
    print('main start')
    ga = await startit(fa())
    print("main waiting for a")
    await ga
    print('main done')

asyncio.run(main())

This outputs:

main start
fa start
main waiting for a
fb start
fa doing blocking thing
fb stop
fa waiting for gb
fa stopping
main done

whereas you would want to have it output:

main start
fa start
fb start
fa doing blocking thing
main waiting for a
fb stop
fa waiting for gb
fa stopping
main done

Basically, when fb blocks, control moves all the way up to main() instead of jumping up one level of invocation to fa, which happens in C#'s depth first model.

Stackless python, when hitting a function decorated with @stacklessio.async would:

  1. create a tasklet future
  2. Insert it into the runnable queue "before" the currently running tasklet
  3. switch to it.

when the new tasklet is blocked, the scheduler will switch to the "next" tasklet, which will be the previously running one.

Unfortunately, this is not easily possible to do in Pythons's "asyncio" framework because the scheduling there is based on callbacks, not tasks.

like image 23
Kristján Valur Avatar answered Oct 21 '22 13:10

Kristján Valur


I think your test is pretty much self-explainary. Predecessor for await and async in Python was generator (in Python 2). Python only creates a coroutine but will not start it until you explicitly call it.

So if you want to trigger the coroutine at once like C#, you need to move the await line up ahead.

async def do_stuff():
    ioBoundTask = do_iobound_work_async() # created a coroutine
    ioBoundResult = await ioBoundTask     # start the coroutine
    cpuBoundResult = do_cpu_intensive_calc()
    print(f"The result is {cpuBoundResult + ioBoundResult}")

This is equivalent to:

def do_stuff():
    # create a generator based coroutine
    # cannot mix syntax of asyncio
    ioBoundTask = do_iobound_work_async()
    ioBoundResult = yield from ioBoundTask
    # whatever

Also see this post: In practice, what are the main uses for the new "yield from" syntax in Python 3.3?


I noticed that your C# and Python are not strictly equivalent. Only asyncio.Task in Python are concurrent:

async def do_cpu_intensive_calc():
    print("Do smart calc...")
    await asyncio.sleep(2)
    print("Calc finished.")
    return 2

# 2.5s
async def do_stuff():
    task1 = asyncio.create_task(do_iobound_work_async())
    task2 = asyncio.create_task(do_cpu_intensive_calc())

    ioBoundResult = await task1
    cpuBoundResult = await task2
    print(f"The result is {cpuBoundResult + ioBoundResult}")

Now execution time should be the same.

like image 31
knh190 Avatar answered Oct 21 '22 15:10

knh190