Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unable to improve asyncio.gather speed by ordering I/O bound coroutines by descending wait times

Recently I came across this article that we can speed up asyncio.gather(*coroutines) by ordering coroutines by descending expected IO wait time.

Try to pass coroutines in asyncio.gather() ordered by the expected IO wait time descending. That is, the first argument should be the coroutine with the highest expected IO wait, and so on.

Link to article: https://blog.sentry.io/2022/02/25/how-we-optimized-python-api-server-code-100x/

I am trying to replicate this by:

Creating coroutines with

  1. CPU bound task,
  2. then an I/O bound task(network call) followed by another
  3. CPU bound task.

I will then use httpx as a client to call a local Flask server which serves API with different time delays.

For the I/O and CPU bound tasks I am:

a. I/O bound tasks - substituting the said SQL queries with network calls to a Flask server that I am running locally, with custom sleeps in different APIs.

b. CPU bound tasks - with a custom function that writes random numbers to a file.

However, I am unable to find consistent and tangible differences in the speeds when I order the coroutines by either ascending or descending I/O wait times.

Computer Specs: I am using python 3.10.6 on a Macbook M1. If you need more information please let me know

For more details, please see the code that I have used to try out the concept below:

Flask app / app.py --> Run by flask run

import asyncio
import time
from flask import Flask

app = Flask(__name__)

@app.route('/delay/1000')
async def delay_1000():
    await asyncio.sleep(1)
    return 'delay 1000'

@app.route('/delay/3000')
async def delay_3000():
    await asyncio.sleep(3)
    return 'delay 3000'

@app.route('/delay/5000')
async def delay_5000():
    await asyncio.sleep(5)
    return 'delay 5000'

test_delay.py file - where I am using httpx as a client to call the above flask server

import shortuuid
import random
import time
import asyncio
import httpx

def _create_file(file_name: str = None, d=3, n=1000, display_duration: bool = False):
    start = time.time()
    with open(file_name, 'w') as f:
        for _ in range(n):
            nums = [str(round(random.uniform(0, 1000), 3)) for _ in range(d)]
            f.write(' '.join(nums))
            f.write('\n')

    if display_duration:
        end = time.time()
        print(f"Duration for {file_name} is {end - start}")


async def make_req(client, url, name=None):
    print(f"{name} is running")
    start = time.time()
    # 1. CPU task
    _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=500000, display_duration=True)

    # 2. I/O bound task (network call)
    res = await client.get(url, timeout=10.0)

    # 3. CPU task
    _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=500000, display_duration=True)
    end = time.time()
    print(f"duration of {name} call is {end - start}s")

    return res


def generate_coros(client):
    one_sec_url = "http://127.0.0.1:5000/delay/1000"
    three_sec_url = "http://127.0.0.1:5000/delay/3000"
    five_sec_url = "http://127.0.0.1:5000/delay/5000"

    one_sec_coro = make_req(client, one_sec_url, name='a')
    three_sec_coro = make_req(client, three_sec_url, name='b')
    five_sec_coro = make_req(client, five_sec_url, name='c')

    return one_sec_coro, three_sec_coro, five_sec_coro

async def all_slow():
    async with httpx.AsyncClient() as client:
        start = time.time()
        query_a, query_b, query_c = generate_coros(client)

        await query_a
        await query_b
        await query_c
        end = time.time()

        print(f"Duration for not awaiting is {end - start}")


async def slow_then_fast():
    async with httpx.AsyncClient() as client:
        start = time.time()
        query_a, query_b, query_c = generate_coros(client)

        results = await asyncio.gather(query_c, query_b, query_a)
        end = time.time()

        print(f"Duration for slow then fast is {end - start}")


async def fast_then_slow():
    async with httpx.AsyncClient() as client:
        start = time.time()
        query_a, query_b, query_c = generate_coros(client)

        results = await asyncio.gather(query_a, query_b, query_c)
        end = time.time()

        print(f"Duration for fast then slow is {end - start}")

async def main():
    await slow_then_fast()
    await fast_then_slow()

asyncio.run(main())

As you can see, I have tried to order the slowest API call first in slow_then_fast and vice versa in the fast_then_slow functions to test out the above theory.

These are my results / Logs:

c is running
Duration for ./text_files/c7ujUaYgZyJUUhx5MEauf4.txt is 1.7627930641174316
b is running
Duration for ./text_files/Pz3jWKBqqUZWKvzqSnRBXj.txt is 1.8750889301300049
a is running
Duration for ./text_files/hwhuJtW8cwh4swLC9u4Ttb.txt is 1.7966642379760742
Duration for ./text_files/RRStJL5DBpB2dzggYJgDGf.txt is 1.7930638790130615
duration of a call is 4.613095998764038s
Duration for ./text_files/gRKwi92jSQkJAkGiumzN9B.txt is 2.0023791790008545
duration of b call is 8.693108081817627s
Duration for ./text_files/FYrVrLbtmYXWjcYyBYVBzu.txt is 1.848686933517456
duration of c call is 12.32311487197876s
Duration for slow then fast is 12.323269128799438
a is running
Duration for ./text_files/KVyHeuCQVQFHNYNHbyM5sc.txt is 1.7585160732269287
b is running
Duration for ./text_files/4FCgKBL2KCFeRTZGCGJyob.txt is 1.7451090812683105
c is running
Duration for ./text_files/i8AVjwhei8GtFGLjAk6Nei.txt is 1.749945878982544
Duration for ./text_files/cUSMAPx6czHb5k9LN3ZHt5.txt is 1.8131372928619385
duration of a call is 8.080067157745361s
Duration for ./text_files/VXVoFPCJQxn9tJtANC7oQL.txt is 1.7849910259246826
duration of b call is 8.291101932525635s
Duration for ./text_files/ge5duApKJQ9825DMaJeXoj.txt is 1.8465838432312012
duration of c call is 8.607009887695312s
Duration for fast then slow is 12.111705780029297

As we can see the duration for slow_then_fast of 12.32s is longer than fast_then_slow coroutine 12.11s. This results goes against the promised speed up in the article

I am not sure if my way of replicating this is wrong, or if this has been resolved in later versions of Python/Asyncio. As I am still new to profiling performance in python, I appreciate any help/advice that I can get/improve upon this. Thank you.

Edit

I have since tried another more direct approach to what the blog post suggests by simply putting sleeps into an SQL to postgres within the flask app:

import shortuuid
import asyncio
import time
import psycopg
from flask import Flask

app = Flask(__name__)


def _create_file(file_name: str = None, d=3, n=1000, display_duration: bool = False):
    start = time.time()
    with open(file_name, 'w') as f:
        for _ in range(n):
            nums = [str(round(random.uniform(0, 1000), 3)) for _ in range(d)]
            f.write(' '.join(nums))
            f.write('\n')

    if display_duration:
        end = time.time()
        print(f"Duration for {file_name} is {end - start}")


async def pg_sleep(duration:int):
    _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=300000, display_duration=True)
    async with await psycopg.AsyncConnection.connect(
        "dbname=postgres user=postgres password=12345 port=50000 host=0.0.0.0"
    ) as aconn:
        async with aconn.cursor() as acur:
            await acur.execute(
                f"SELECT pg_sleep({duration})"
            )
    _create_file(f"./text_files/{shortuuid.uuid()}.txt", d=5,n=300000, display_duration=True)



@app.route('/fast_to_slow')
async def fast_to_slow():
    start = time.time()
    await asyncio.gather(pg_sleep(1), pg_sleep(3), pg_sleep(5))
    end = time.time()

    delay_duration = f'delay {end - start} duration'
    print(delay_duration)
    return delay_duration

@app.route('/slow_to_fast')
async def slow_to_fast():
    start = time.time()
    await asyncio.gather(pg_sleep(5), pg_sleep(3), pg_sleep(1))
    end = time.time()

    delay_duration = f'delay {end - start} duration'
    print(delay_duration)
    return delay_duration

if __name__ == '__main__':
    arg_parser = argparse.ArgumentParser()
    arg_parser.add_argument("--port")
    port = arg_parser.parse_args().port
    app.run(debug=True, port=port)

However, I am also not seeing a great difference in terms wall time:

Duration for ./text_files/SwrTSKNvB6d9gZdxgJGsZU.txt is 1.0756349563598633
Duration for ./text_files/PGjwZmUrGW9wWgbuYMsAih.txt is 1.0807139873504639
Duration for ./text_files/8WmZkViwjbw5cfPQzjuzBS.txt is 1.0467939376831055
Duration for ./text_files/BVgviCNZgPVkGbcHAR8oPG.txt is 1.0906238555908203
Duration for ./text_files/MqSAuH8BSYm2sWLFJ4ueq8.txt is 1.0889880657196045
Duration for ./text_files/USPppWYSjPftv8rZbiwDdV.txt is 1.096479892730713
delay 9.426467895507812 duration
127.0.0.1 - - [12/Mar/2023 11:08:54] "GET /slow_to_fast HTTP/1.1" 200 -
Duration for ./text_files/6ZS9SL38N5NKFaLdPjpLTN.txt is 1.1848421096801758
Duration for ./text_files/YsXka3m6DELFHXwAc4VSci.txt is 1.0680510997772217
Duration for ./text_files/3dFRbZazhncDC8vhEsjBf3.txt is 1.0647330284118652
Duration for ./text_files/HVfVkUnPLQnAC5V5wq6ACw.txt is 1.0934898853302002
Duration for ./text_files/oHAujuJ5PPqnqqLVS3nuMp.txt is 1.0853149890899658
Duration for ./text_files/hHRuoguqYbVsK6RuGE6HKf.txt is 1.0910439491271973
delay 9.443907022476196 duration
127.0.0.1 - - [12/Mar/2023 11:09:05] "GET /fast_to_slow HTTP/1.1" 200 -

---- Edit 2: ----

Based on @felipe's suggestion I have tried to replace the asyncio.sleep() with a network call to see if I can replicate the desired results:

import time
import timeit
import asyncio
import httpx

async def make_req(url, name=None, sleep_duration=0):
    # 1. CPU task (replaced with sleep)
    time.sleep(1)

    async with httpx.AsyncClient() as client:
        await client.get(url, timeout=10.0)

    # await asyncio.sleep(sleep_duration)

    # 3. CPU task (replaced with sleep)
    time.sleep(1)

one_sec_url = "http://127.0.0.1:5000/delay/1000"
three_sec_url = "http://127.0.0.1:6000/delay/3000"
five_sec_url = "http://127.0.0.1:7000/delay/5000"

async def query_a():
    await make_req(one_sec_url, name='a', sleep_duration=1)

async def query_b():
    await make_req(three_sec_url, name='b', sleep_duration=3)

async def query_c():
    await make_req(five_sec_url, name='c', sleep_duration=5)

# --- tests ---
async def t1():
    await asyncio.gather(query_a(),query_b(), query_c())

async def t2():
    await asyncio.gather(query_c(),query_b(), query_a())


run = timeit.timeit(lambda: asyncio.run(t1()), number=1)
print(f"Shortest I/O bound First: {run}")

run = timeit.timeit(lambda: asyncio.run(t2()), number=1)
print(f"Longest I/O bound first: {run}")

These are my results:

Shortest I/O bound First: 9.140814584000054
Longest I/O bound first: 9.072756332999916
like image 442
jcleow Avatar asked Sep 02 '25 02:09

jcleow


1 Answers

I am trying to replicate this by: Creating coroutines with

  1. CPU bound task,
  2. then an I/O bound task(network call) followed by another
  3. CPU bound task.

A quick refresher: Asyncio is used specifically for IO bound tasks - these are tasks that typically forces the CPU to sit idle during some period of time as it awaits an external reply.

The usual examples are network calls, disks retrievals, and the sort. In synchronous code a simple web request involves your CPU sending information to your hardware's network card and sitting idle until it receives a reply. During this period of CPU idleness the network card sends the request, the destination server receives, processes, and replies back; and only then, your network card sends it back to the CPU to continue where it left off.

Asyncio allows you to program your code in a way that once your CPU sends the required information to your hardware's network card, the CPU moves on to the next task in a list of tasks, periodically checking back-in to see if the original task it created has received a reply.

With that in mind, there is a very simple way to simulate synchronous and asynchronous IO in Python code, and that is through the use of time.sleep (for CPU bound tasks) and asyncio.sleep (for IO bound tasks). One will block the CPU, and the other will not.

And so, we can write a very simple test program to test out the theory the blog post put forth. Is starting an IO-bound task and then a CPU-bound task faster than the opposite?

import asyncio
import time
import timeit

async def io_bound():
    await asyncio.sleep(1)

async def cpu_bound():
    time.sleep(1)

async def t1():
    await asyncio.gather(cpu_bound(), io_bound(), io_bound())

async def t2():
    await asyncio.gather(io_bound(), io_bound(), cpu_bound())

run = timeit.timeit(lambda: asyncio.run(t1()), number=10)
print(f"CPU Bound First: {run}")

run = timeit.timeit(lambda: asyncio.run(t2()), number=10)
print(f"I/O Bound First: {run}")
CPU Bound First: 20.06762075
I/O Bound First: 10.058394083

And the answer is yes. Intuitively, this should also make sense. If you start an IO-bound task and then switch to doing a CPU-bound task, the IO-bound task is already "on its way" by the time you start the blocking CPU-bound task.


Misunderstood question based on "creating coroutines with" instead of "creating a coroutine with". The answer belows clarifies on this point. As it relates to the blog post, here is their setup.

We launch three coroutines that request data from a SQL DB. Athenian uses PostgreSQL, so let’s imagine that we work with PostgreSQL. Each coroutine passes through three stages:

  1. (CPU) Prepare the request to PostgreSQL and send it.
  2. (IO wait) Wait until PostgreSQL responds.
  3. (CPU) Read the response and convert it to Python objects.

Let’s suppose that (1) and (3) both elapse one second for each of the coroutines, and that PostgreSQL is infinitely powerful and always requires 5 seconds to compute the response for query_sql_a, 3 seconds for query_sql_b, and 1 second for query_sql_c. This doesn’t mean that, for example, query_sql_a will always spend 5 seconds in IO wait (2), because Python can only execute one of the three coroutines at each moment of time.

So effectively saying:

  • The query_sql_N function runs (in this fixed order) a CPU-bound instruction, an IO-bound instruction, and a CPU-bound instruction.
  • The functions take the following "hypothetical" amount of time N for the IO-bound instruction (not the entire function):
    • query_sql_a takes 5 seconds.
    • query_sql_b takes 3 seconds.
    • query_sql_c takes 1 second.
  • Within the functions, the following is the amount of time each instructions take:
    1. CPU-bound takes 1 second.
    2. I/O-bound takes N seconds.
    3. CPU-bound takes 1 second.

With this knowledge, we can do a quick 1:1 example of what they mean.

import asyncio
import time
import timeit

async def operation(total_time):

    # Runs the CPU-bound, IO-bound, and CPU-bound operations
    time.sleep(1)
    await asyncio.sleep(total_time)
    time.sleep(1)

async def t1():
    await asyncio.gather(operation(5), operation(3), operation(1))

async def t2():
    await asyncio.gather(operation(1), operation(3), operation(5))

run = timeit.timeit(lambda: asyncio.run(t1()), number=5)
print(f"Longest first: {run}")

run = timeit.timeit(lambda: asyncio.run(t2()), number=5)
print(f"Shortest first: {run}")
Longest first: 35.131927833
Shortest first: 45.100182916

Which is the expected results, and as described in the blog post:

Try to pass coroutines in asyncio.gather() ordered by the expected IO wait time descending. That is, the first argument should be the coroutine with the highest expected IO wait, and so on.

And once again, this should make intuitive sense. The issue is that the blog post really adds an extra complexity to problem by combining CPU-bound & IO-bound instructions into a single function, but nonetheless, this should also make intuitive sense;

If you start an IO-bound task and then switch to doing a CPU-bound task, the IO-bound task is already "on its way" by the time you start the blocking CPU-bound task.

Which means that if you start the longest IO-bound task first, you are in essence maximizing the period of time in which the IO-bound instructions (plural) are running in parallel to each other, versus spreading them out between different CPU-bound instructions. The blog post has some really nice graphics to showcase this concept.

You can also think of this from the perspective of the loop, and as it switches context between the different tasks.

operation(5), operation(3), operation(1)
t = 0; 
  OP(5): Blocking 1 second wait.

t = 1;
  OP(5): Not blocking 5 second wait (5 secs left).
  OP(3): Blocking 1 second wait.

t = 2;
  OP(5): Not blocking 5 second wait (4 secs left).
  OP(3): Not blocking 3 second wait (3 secs left).
  OP(1): Blocking 1 second wait.
 
t = 3;
  OP(5): Not blocking 5 second wait (3 secs left).
  OP(3): Not blocking 3 second wait (2 secs left).
  OP(1): Not blocking 1 second wait (1 sec left).

t = 4;
  OP(5): Not blocking 5 second wait (2 secs left).
  OP(3): Not blocking 3 second wait (1 secs left).
  OP(1): Blocking 1 second wait.

t = 5;
  OP(5): Not blocking 5 second wait (1 secs left).
  OP(3): Blocking 1 second wait.

t = 6;
  OP(5): Blocking 1 second wait.

t = 7;
  Done.

If you run the following test:

timeit.repeat(lambda: asyncio.run(t1()), number=1, repeat=3)

You will notice it takes almost exactly 7 seconds to finish that test.

Longest first: [7.028715833, 7.021865125, 7.025615832999998]

If you write out the diagram like the one above for the opposite test, you will also be able to guesstimate its execution time.

like image 110
Felipe Avatar answered Sep 07 '25 13:09

Felipe