I have an asynchronous application that is serving requests via aiohttp and doing other async tasks (interacting with a database, handling messages, making requests itself as an HTTP client). I'd like to monitor how busy the event loop is, perhaps how much time it's spending executing code versus waiting for a select
to complete.
Is there a way to measure this with standard library event loops or other third-party loops (uvloop)?
Specifically, I'd like an ongoing percentage measure of saturation, not just a binary "is it busy" that this question seems to address.
run_in_executor is used to manage threads from within an event loop. To this end, it needs to wrap the thread into a Future, which needs to be assigned to an event loop (in one way or another). The reason the method is stored directly in a loop object is probably historical. It might as well have been asyncio.
get_event_loop () Get the current event loop. If there is no current event loop set in the current OS thread, the OS thread is main, and set_event_loop() has not yet been called, asyncio will create a new event loop and set it as the current one.
It'll block event loop at a point.
You can create multiple threads and run different event loops in each of them.
Digging source code shows following:
_run_once
in while True
loop_run_once
does all stuff including waiting for a select
to completetimeout
to wait for select
isn't stored anywhere outside _run_once
Nothing stops us to reimplementing _run_once
, so we can time what we need.
Instead of copying the whole _run_once
implementation, we can time right before select
, which is when _run_once
started (because before select
nothing time consuming happens) and time right after select
is when _process_events
started.
From words to action:
import asyncio
class MeasuredEventLoop(asyncio.SelectorEventLoop):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._total_time = 0
self._select_time = 0
self._before_select = None
# TOTAL TIME:
def run_forever(self):
started = self.time()
try:
super().run_forever()
finally:
finished = self.time()
self._total_time = finished - started
# SELECT TIME:
def _run_once(self):
self._before_select = self.time()
super()._run_once()
def _process_events(self, *args, **kwargs):
after_select = self.time()
self._select_time += after_select - self._before_select
super()._process_events(*args, **kwargs)
# REPORT:
def close(self, *args, **kwargs):
super().close(*args, **kwargs)
select = self._select_time
cpu = self._total_time - self._select_time
total = self._total_time
print(f'Waited for select: {select:.{3}f}')
print(f'Did other stuff: {cpu:.{3}f}')
print(f'Total time: {total:.{3}f}')
Let's test it:
import time
async def main():
await asyncio.sleep(1) # simulate I/O, will be handled by selectors
time.sleep(0.01) # CPU job, executed here, outside event loop
await asyncio.sleep(1)
time.sleep(0.01)
loop = MeasuredEventLoop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
Result:
Waited for select: 2.000
Did other stuff: 0.032
Total time: 2.032
Let's test it against code with real I/O:
import aiohttp
async def io_operation(delay):
async with aiohttp.ClientSession() as session:
async with session.get(f'http://httpbin.org/delay/{delay}') as resp:
await resp.text()
async def main():
await asyncio.gather(*[
io_operation(delay=1),
io_operation(delay=2),
io_operation(delay=3),
])
Result:
Waited for select: 3.250
Did other stuff: 0.016
Total time: 3.266
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With