Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Waiting on condition variable with timeout: lock not reacquired in time

I have an asyncio.Condition named cond. I wish to wait on it, but only for so long before giving up. As asyncio.Condition.wait does not take a timeout, this cannot be done directly. The docs state that asyncio.wait_for should be used to wrap and provide a timeout instead:

The asyncio.wait_for() function can be used to cancel a task after a timeout.

Thus, we arrive at the following solution:

async def coro():
    print("Taking lock...")
    async with cond:
        print("Lock acquired.")
        print("Waiting!")
        await asyncio.wait_for(cond.wait(), timeout=999)
        print("Was notified!")
    print("Lock released.")

Now assume that coro itself is cancelled five seconds after being run. This throws CancelledError in the wait_for, which cancels the cond.wait before re-raising the error. The error then propagates to coro, which due to the async with block, implicitly attempts to release the lock in cond. However, the lock is not currently held; cond.wait has been cancelled but hasn't had a chance to process that cancellation and re-acquire the lock. Thus, we get an ugly exception like the following:

Taking lock...
Lock acquired.
Waiting!
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at [REDACTED]> exception=RuntimeError('Lock is not acquired.',)>
Traceback (most recent call last):
  [REDACTED], in coro
    await asyncio.wait_for(cond.wait(), timeout=999)
  [REDACTED], in wait_for
    yield from waiter
concurrent.futures._base.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  [REDACTED], in coro
    print("Was notified!")
  [REDACTED], in coro
    res = func(*args, **kw)
  [REDACTED], in __aexit__
    self.release()
  [REDACTED], in release
    raise RuntimeError('Lock is not acquired.')
RuntimeError: Lock is not acquired.

In other words, whilst handling the CancelledError, coro raised a RuntimeError from trying to release a lock that wasn't held. The reason the stacktrace shows the print("Was notified!") line is because that is the last line of the offending async with block.


This doesn't feel like something I can fix; I'm starting to suspect it's a bug in the library itself. However, I can't think of any way to avoid the issue or create a workaround, so any ideas would be appreciated.

Whilst writing this question and investigating further, I stumbled into similar problems on the Python bug tracker, ended up inspecting the asyncio source code, and determined that this is, in fact, a bug in asyncio itself.

I've submitted it to the issue tracker here for those who have the same problem, and answered my own question with a workaround that I have created.


EDIT: As requested by ParkerD, here is the full runnable example that produces the above issue:

EDIT 2: updated example to use the new asyncio.run and asyncio.create_task features from Python 3.7+

import asyncio

async def coro():
    cond = asyncio.Condition()
    print("Taking lock...")
    async with cond:
        print("Lock acquired.")
        print("Waiting!")
        await asyncio.wait_for(cond.wait(), timeout=999)
        print("Was notified!")
    print("Lock released.")

async def cancel_after_5(c):
    task = asyncio.create_task(c)
    await asyncio.sleep(5)
    task.cancel()
    await asyncio.wait([task])

asyncio.run(cancel_after_5(coro()))
like image 495
Christopher Riches Avatar asked Oct 15 '22 08:10

Christopher Riches


1 Answers

As stated at the end of the question, I've determined that the issue is actually a bug in the library. I'll reiterate that the issue tracker for that bug is here, and present my workaround.

The following function is based on wait_for itself (source here), and is a version of it specialised for waiting on conditions, with the added guarantee that cancelling it is safe.

Calling wait_on_condition_with_timeout(cond, timeout) is roughly equivalent to asyncio.wait_for(cond.wait(), timeout).

async def wait_on_condition_with_timeout(condition: asyncio.Condition, timeout: float) -> bool:
    loop = asyncio.get_event_loop()

    # Create a future that will be triggered by either completion or timeout.
    waiter = loop.create_future()

    # Callback to trigger the future. The varargs are there to consume and void any arguments passed.
    # This allows the same callback to be used in loop.call_later and wait_task.add_done_callback,
    # which automatically passes the finished future in.
    def release_waiter(*_):
        if not waiter.done():
            waiter.set_result(None)

    # Set up the timeout
    timeout_handle = loop.call_later(timeout, release_waiter)

    # Launch the wait task
    wait_task = loop.create_task(condition.wait())
    wait_task.add_done_callback(release_waiter)

    try:
        await waiter  # Returns on wait complete or timeout
        if wait_task.done():
            return True
        else:
            raise asyncio.TimeoutError()

    except (asyncio.TimeoutError, asyncio.CancelledError):
        # If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
        # then re-raise.
        wait_task.remove_done_callback(release_waiter)
        wait_task.cancel()
        await asyncio.wait([wait_task])
        raise

    finally:
        timeout_handle.cancel()

The crucial part is that if a timeout or cancellation occurs, the method waits for the condition to re-acquire the lock before re-raising the exception:

except (asyncio.TimeoutError, asyncio.CancelledError):
        # If timeout or cancellation occur, clean up, cancel the wait, let it handle the cancellation,
        # then re-raise.
        wait_task.remove_done_callback(release_waiter)
        wait_task.cancel()
        await asyncio.wait([wait_task])  # This line is missing from the real wait_for
        raise

I've tested this on Python 3.6.9 and it works perfectly. The same bug exists in 3.7 and 3.8 too, so I imagine it is also useful for those versions. If you want to know when the bug will be fixed, check the issue tracker above. If you want a version for things other than Conditions, it should be trivial to change the parameter and create_task line.

like image 81
Christopher Riches Avatar answered Oct 20 '22 10:10

Christopher Riches