I have an asyncio.Condition
named cond
. I wish to wait on it, but only for so long before giving up. As asyncio.Condition.wait
does not take a timeout, this cannot be done directly. The docs state that asyncio.wait_for
should be used to wrap and provide a timeout instead:
The asyncio.wait_for() function can be used to cancel a task after a timeout.
Thus, we arrive at the following solution:
async def coro():
print("Taking lock...")
async with cond:
print("Lock acquired.")
print("Waiting!")
await asyncio.wait_for(cond.wait(), timeout=999)
print("Was notified!")
print("Lock released.")
Now assume that coro
itself is cancelled five seconds after being run. This throws CancelledError
in the wait_for
, which cancels the cond.wait
before re-raising the error. The error then propagates to coro
, which due to the async with
block, implicitly attempts to release the lock in cond
. However, the lock is not currently held; cond.wait
has been cancelled but hasn't had a chance to process that cancellation and re-acquire the lock. Thus, we get an ugly exception like the following:
Taking lock...
Lock acquired.
Waiting!
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at [REDACTED]> exception=RuntimeError('Lock is not acquired.',)>
Traceback (most recent call last):
[REDACTED], in coro
await asyncio.wait_for(cond.wait(), timeout=999)
[REDACTED], in wait_for
yield from waiter
concurrent.futures._base.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
[REDACTED], in coro
print("Was notified!")
[REDACTED], in coro
res = func(*args, **kw)
[REDACTED], in __aexit__
self.release()
[REDACTED], in release
raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.
In other words, whilst handling the CancelledError
, coro
raised a RuntimeError
from trying to release a lock that wasn't held. The reason the stacktrace shows the print("Was notified!")
line is because that is the last line of the offending async with
block.
Whilst writing this question and investigating further, I stumbled into similar problems on the Python bug tracker, ended up inspecting the asyncio
source code, and determined that this is, in fact, a bug in asyncio
itself.
I've submitted it to the issue tracker here for those who have the same problem, and answered my own question with a workaround that I have created.
EDIT: As requested by ParkerD, here is the full runnable example that produces the above issue:
EDIT 2: updated example to use the new asyncio.run
and asyncio.create_task
features from Python 3.7+
import asyncio
async def coro():
cond = asyncio.Condition()
print("Taking lock...")
async with cond:
print("Lock acquired.")
print("Waiting!")
await asyncio.wait_for(cond.wait(), timeout=999)
print("Was notified!")
print("Lock released.")
async def cancel_after_5(c):
task = asyncio.create_task(c)
await asyncio.sleep(5)
task.cancel()
await asyncio.wait([task])
asyncio.run(cancel_after_5(coro()))
As stated at the end of the question, I've determined that the issue is actually a bug in the library. I'll reiterate that the issue tracker for that bug is here, and present my workaround.
The following function is based on wait_for
itself (source here), and is a version of it specialised for waiting on conditions, with the added guarantee that cancelling it is safe.
Calling wait_on_condition_with_timeout(cond, timeout)
is roughly equivalent to asyncio.wait_for(cond.wait(), timeout)
.
async def wait_on_condition_with_timeout(condition: asyncio.Condition, timeout: float) -> bool:
loop = asyncio.get_event_loop()
# Create a future that will be triggered by either completion or timeout.
waiter = loop.create_future()
# Callback to trigger the future. The varargs are there to consume and void any arguments passed.
# This allows the same callback to be used in loop.call_later and wait_task.add_done_callback,
# which automatically passes the finished future in.
def release_waiter(*_):
if not waiter.done():
waiter.set_result(None)
# Set up the timeout
timeout_handle = loop.call_later(timeout, release_waiter)
# Launch the wait task
wait_task = loop.create_task(condition.wait())
wait_task.add_done_callback(release_waiter)
try:
await waiter # Returns on wait complete or timeout
if wait_task.done():
return True
else:
raise asyncio.TimeoutError()
except (asyncio.TimeoutError, asyncio.CancelledError):
# If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
# then re-raise.
wait_task.remove_done_callback(release_waiter)
wait_task.cancel()
await asyncio.wait([wait_task])
raise
finally:
timeout_handle.cancel()
The crucial part is that if a timeout or cancellation occurs, the method waits for the condition to re-acquire the lock before re-raising the exception:
except (asyncio.TimeoutError, asyncio.CancelledError):
# If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
# then re-raise.
wait_task.remove_done_callback(release_waiter)
wait_task.cancel()
await asyncio.wait([wait_task]) # This line is missing from the real wait_for
raise
I've tested this on Python 3.6.9 and it works perfectly. The same bug exists in 3.7 and 3.8 too, so I imagine it is also useful for those versions. If you want to know when the bug will be fixed, check the issue tracker above. If you want a version for things other than Condition
s, it should be trivial to change the parameter and create_task
line.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With