Pretty new to python asyncio lib and have been banging my head to implement a keep alive task. I want to concurrently run a cpu intensive task and a keep alive task. The keep alive should run periodically until cpu intensive finished.
import asyncio
import time
async def cpu_intensive():
print("cpu_intensive for 3 seconds")
delay = 3
close_time = time.time() + delay
while True:
if time.time() > close_time:
break
async def keep_alive():
print("keep alive for 1 second") # My real use case is I want to send a heart beat message every x seconds until cpu intensive finished
await asyncio.sleep(1)
async def main():
cpu_intensive_task = asyncio.create_task(cpu_intensive())
keep_alive_task = asyncio.create_task(keep_alive())
print(f"Started at {time.strftime('%X')}")
# TODO: Not sure how to achieve the expected output
print(f"Finished at {time.strftime('%X')}")
asyncio.run(main())
'''
Expected Output
Started at 23:55:08
cpu_intensive 3 seconds
keep alive for 1 seconds
keep alive for 1 seconds
keep alive for 1 seconds
Finished at 23:55:11
'''
I have browse through the asyncio lib python and tried several API such as await, run_coroutine_threadsafe, asyncio.gather. But couldn't get it work.
I think you might have confused concept of concurrency with Parallelism. Let me write some of what I understood about when playing with asyncio:
Parallel: 'Physical' concurrency, where there could be more than 1 line of code being executed simultaneously at one point.
multiprocessingAsynchronous(await/async): 'Perceived' concurrency, where there can't be more than 1 line of code being executed at any given time, but achieve concurrency via context switching. Uses await keyword to allow context changes.
asyncio curio trioAsynchronous(time division): aka Thread. Thread in python can only execute 1 line of code at any given time due to GIL. Therefore shares similarity with above.
threadingSo, for any CPU intensive workload, it is better be parallelized.
For any IO bound workload (aka waiting), it is better coded asynchronously - because we don't need to utilize more cores anyway.
You need to await something in cpu_intensive coroutine.
As shown on this SO post We can use yield asyncio.sleep(0) to add context switching point inside workload. Of course this is not desired way of writing asynchronous code, but if you need to attach such function to async code it's a way.
import asyncio
import time
async def cpu_intensive():
print("cpu_intensive for 3 seconds")
duration = 3
close_time = time.time() + duration
while True:
if time.time() > close_time:
break
await asyncio.sleep(0)
async def keep_alive():
while True:
print("keep alive for 1 second")
await asyncio.sleep(1)
async def main():
print(f"Started at {time.strftime('%X')}")
cpu_intensive_task = asyncio.create_task(cpu_intensive())
asyncio.create_task(keep_alive())
await cpu_intensive_task
print(f"Finished at {time.strftime('%X')}")
asyncio.run(main())
"""
Started at 04:13:09
cpu_intensive for 3 seconds
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
Finished at 04:13:12
"""
There's one more keep alive because it check condition first then wait 1 seconds.
Do note asyncio.sleep is scheduling for event not actually is waiting accurate given time. Consider it like "Do whatever you want while I sleep, but just make sure to call me after X seconds."
P.S.
Later at one point, you'll realize the instability, hard error handling or inconsistency of asyncio and stumble upon to trio like I did, for that case I am leaving example for trio.
import trio
import time
class TaskDoneException(Exception):
pass
async def cpu_intensive():
print("cpu_intensive for 3 seconds")
duration = 3
close_time = time.time() + duration
while True:
if time.time() > close_time:
raise TaskDoneException()
await trio.sleep(0)
async def keep_alive():
while True:
print("keep alive for 1 second")
await trio.sleep(1)
async def main():
try:
async with trio.open_nursery() as nursery:
print(f"Started at {time.strftime('%X')}")
nursery.start_soon(cpu_intensive)
nursery.start_soon(keep_alive)
except TaskDoneException:
print(f"Finished at {time.strftime('%X')}")
trio.run(main)
'''
Output:
Started at 17:43:45
keep alive for 1 second
cpu_intensive for 3 seconds
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
Finished at 17:43:48
'''
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With