Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement in python given two coroutine, keep run one until the other finished

Pretty new to python asyncio lib and have been banging my head to implement a keep alive task. I want to concurrently run a cpu intensive task and a keep alive task. The keep alive should run periodically until cpu intensive finished.

import asyncio
import time

async def cpu_intensive():
    print("cpu_intensive for 3 seconds")
    delay = 3
    close_time = time.time() + delay
    while True:
        if time.time() > close_time:
            break


async def keep_alive():
    print("keep alive for 1 second") # My real use case is I want to send a heart beat message every x seconds until cpu intensive finished
    await asyncio.sleep(1)

async def main():
    cpu_intensive_task = asyncio.create_task(cpu_intensive())
    keep_alive_task = asyncio.create_task(keep_alive())
    print(f"Started at {time.strftime('%X')}")
    # TODO: Not sure how to achieve the expected output
    print(f"Finished at {time.strftime('%X')}")

asyncio.run(main())

'''
Expected Output
Started at 23:55:08
cpu_intensive 3 seconds
keep alive for 1 seconds
keep alive for 1 seconds
keep alive for 1 seconds
Finished at 23:55:11
'''

I have browse through the asyncio lib python and tried several API such as await, run_coroutine_threadsafe, asyncio.gather. But couldn't get it work.

like image 425
xuanyue Avatar asked Apr 14 '26 20:04

xuanyue


1 Answers

I think you might have confused concept of concurrency with Parallelism. Let me write some of what I understood about when playing with asyncio:

Two way of achieving concurrency

  • Parallel: 'Physical' concurrency, where there could be more than 1 line of code being executed simultaneously at one point.

    • library: multiprocessing
    • pro: Can utilize Multiple core
    • con: Takes some resource to create processes. High overhead communicating with processes(pickle serialization is used). workload has to be thread-safe.
  • Asynchronous(await/async): 'Perceived' concurrency, where there can't be more than 1 line of code being executed at any given time, but achieve concurrency via context switching. Uses await keyword to allow context changes.

    • libraries: asyncio curio trio
    • pro: Can utilize one core better than synchronous code. Much lightweight. Control flows are more predictable than threading. (context switching ONLY happens on await keywords.)
    • con: Cannot utilize Multiple core. Can't run more than 1 code at any given time. Can't switch context at middle of heavy workload.
  • Asynchronous(time division): aka Thread. Thread in python can only execute 1 line of code at any given time due to GIL. Therefore shares similarity with above.

    • libraries: threading
    • pro: Can utilize one core better than synchronous code. (Because it run other code while waiting.) Much lightweight. Since it's using time division method it can run even under CPU heavy workload. (By stopping workload briefly and executing other thread)
    • con: Cannot utilize Multiple core. Can't run more than 1 code at any given time. Control flows are hard to predict.

So, for any CPU intensive workload, it is better be parallelized.

For any IO bound workload (aka waiting), it is better coded asynchronously - because we don't need to utilize more cores anyway.

Code fixes

asyncio

You need to await something in cpu_intensive coroutine.

As shown on this SO post We can use yield asyncio.sleep(0) to add context switching point inside workload. Of course this is not desired way of writing asynchronous code, but if you need to attach such function to async code it's a way.

import asyncio
import time


async def cpu_intensive():
    print("cpu_intensive for 3 seconds")
    duration = 3
    close_time = time.time() + duration
    while True:
        if time.time() > close_time:
            break

        await asyncio.sleep(0)


async def keep_alive():
    while True:
        print("keep alive for 1 second")
        await asyncio.sleep(1)


async def main():
    print(f"Started at {time.strftime('%X')}")

    cpu_intensive_task = asyncio.create_task(cpu_intensive())
    asyncio.create_task(keep_alive())

    await cpu_intensive_task

    print(f"Finished at {time.strftime('%X')}")


asyncio.run(main())

"""
Started at 04:13:09
cpu_intensive for 3 seconds
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
Finished at 04:13:12
"""

There's one more keep alive because it check condition first then wait 1 seconds.

Do note asyncio.sleep is scheduling for event not actually is waiting accurate given time. Consider it like "Do whatever you want while I sleep, but just make sure to call me after X seconds."


P.S.

Later at one point, you'll realize the instability, hard error handling or inconsistency of asyncio and stumble upon to trio like I did, for that case I am leaving example for trio.

trio

import trio
import time


class TaskDoneException(Exception):
    pass


async def cpu_intensive():
    print("cpu_intensive for 3 seconds")
    duration = 3
    close_time = time.time() + duration
    while True:
        if time.time() > close_time:
            raise TaskDoneException()

        await trio.sleep(0)


async def keep_alive():
    while True:
        print("keep alive for 1 second")
        await trio.sleep(1)


async def main():
    try:
        async with trio.open_nursery() as nursery:

            print(f"Started at {time.strftime('%X')}")
            nursery.start_soon(cpu_intensive)
            nursery.start_soon(keep_alive)
    except TaskDoneException:
        print(f"Finished at {time.strftime('%X')}")


trio.run(main)


'''
Output:
Started at 17:43:45
keep alive for 1 second
cpu_intensive for 3 seconds
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
Finished at 17:43:48
'''

like image 84
jupiterbjy Avatar answered Apr 16 '26 11:04

jupiterbjy