Basically I want:
await action1()
await action2()
return result
with a single timeout for both actions and - that's important - with an error message telling which action timed out.
For comparison, with just one action:
try:
await asyncio.wait_for(action(), timeout=1.0)
except asyncio.TimeoutError:
raise RuntimeError("Problem")
Now with two actions I have this and don't like it.
import asyncio
async def a2():
try:
await asyncio.sleep(1.0)
except asyncio.CancelledError:
raise RuntimeError("Problem 1") from None
try:
await asyncio.sleep(1.0)
except asyncio.CancelledError:
raise RuntimeError("Problem 2") from None
return True
async def test():
loop = asyncio.get_event_loop()
action_task = loop.create_task(a2())
# timeouts: 0.5 -> Problem1 exc; 1.5 -> Problem2 exc; 2.5 -> OK
try:
await asyncio.wait_for(action_task, timeout=0.5)
except asyncio.TimeoutError:
pass
result = await action_task
asyncio.get_event_loop().run_until_complete(test())
I find it really counter-intuitive to have:
except asyncio.TimeoutError:
pass
where timeout handling is the main functionality. Can you suggest a better way?
Can you suggest a better way?
Your code is correct, but if you're looking for something more elegant, perhaps a context manager would fit your usage:
class Timeout:
def __init__(self, tmout):
self.tmout = tmout
self._timed_out = False
self._section = None
async def __aenter__(self):
loop = asyncio.get_event_loop()
self._timer = loop.call_later(self.tmout, self._cancel_task,
asyncio.current_task())
return self
def set_section(self, section):
self._section = section
def _cancel_task(self, task):
self._timed_out = True
task.cancel()
async def __aexit__(self, t, v, tb):
if self._timed_out:
assert t is asyncio.CancelledError
raise RuntimeError(self._section) from None
else:
self._timer.cancel()
One would use it as follows:
async def main():
# raises RuntimeError("second sleep")
async with Timeout(1) as tmout:
tmout.set_section("first sleep")
# increase above 1.0 and "first sleep" is raised
await asyncio.sleep(0.8)
tmout.set_section("second sleep")
await asyncio.sleep(0.5)
asyncio.get_event_loop().run_until_complete(main())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With