I have an application which already runs infinitely with asyncio event loop run forever and also I need to run a specific function every 10 seconds.
def do_something():
pass
a = asyncio.get_event_loop()
a.run_forever()
I would like to call the function do_something every 10 seconds. How to achieve this without replacing asynctio event loop with while loop ?
Edited: I can achieve this with the below code
def do_something():
pass
while True:
time.sleep(10)
do_something()
But I dont want to use while loop to run infinitely in my application instead I would like to go with asyncio run_forever(). So how to call the same function every 10 seconds with asyncio ? is there any scheduler like which will not block my ongoing work ?
asyncio
does not ship with a builtin scheduler, but it is easy enough to build your own. Simply combine a while
loop with asyncio.sleep
to run code every few seconds.
async def every(__seconds: float, func, *args, **kwargs):
while True:
func(*args, **kwargs)
await asyncio.sleep(__seconds)
a = asyncio.get_event_loop()
a.create_task(every(1, print, "Hello World"))
...
a.run_forever()
Note that the design has to be slightly different if func
is itself a coroutine or a long-running subroutine. In the former case use await func(...)
and in the latter case use asyncio
's thread capabilities.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With