Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Run a function every n seconds in python with asyncio

I have an application which already runs infinitely with asyncio event loop run forever and also I need to run a specific function every 10 seconds.

def do_something():
   pass

a = asyncio.get_event_loop()
a.run_forever()

I would like to call the function do_something every 10 seconds. How to achieve this without replacing asynctio event loop with while loop ?

Edited: I can achieve this with the below code

def do_something():
   pass
while True:
   time.sleep(10)
   do_something()

But I dont want to use while loop to run infinitely in my application instead I would like to go with asyncio run_forever(). So how to call the same function every 10 seconds with asyncio ? is there any scheduler like which will not block my ongoing work ?


1 Answers

asyncio does not ship with a builtin scheduler, but it is easy enough to build your own. Simply combine a while loop with asyncio.sleep to run code every few seconds.

async def every(__seconds: float, func, *args, **kwargs):
    while True:
        func(*args, **kwargs)
        await asyncio.sleep(__seconds)

a = asyncio.get_event_loop()
a.create_task(every(1, print, "Hello World"))
...
a.run_forever()

Note that the design has to be slightly different if func is itself a coroutine or a long-running subroutine. In the former case use await func(...) and in the latter case use asyncio's thread capabilities.

like image 72
MisterMiyagi Avatar answered Sep 16 '25 11:09

MisterMiyagi