I have been experimenting with asyncio
for a little while and read the PEPs; a few tutorials; and even the O'Reilly book.
I think I got the hang of it, but I'm still puzzled by the behavior of loop.close()
which I can't quite figure out when it is "safe" to invoke.
Distilled to its simplest, my use case is a bunch of blocking "old school" calls, which I wrap in the run_in_executor()
and an outer coroutine; if any of those calls goes wrong, I want to stop progress, cancel the ones still outstanding, print a sensible log and then (hopefully, cleanly) get out of the way.
Say, something like this:
import asyncio
import time
def blocking(num):
time.sleep(num)
if num == 2:
raise ValueError("don't like 2")
return num
async def my_coro(loop, num):
try:
result = await loop.run_in_executor(None, blocking, num)
print(f"Coro {num} done")
return result
except asyncio.CancelledError:
# Do some cleanup here.
print(f"man, I was canceled: {num}")
def main():
loop = asyncio.get_event_loop()
tasks = []
for num in range(5):
tasks.append(loop.create_task(my_coro(loop, num)))
try:
# No point in waiting; if any of the tasks go wrong, I
# just want to abandon everything. The ALL_DONE is not
# a good solution here.
future = asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
done, pending = loop.run_until_complete(future)
if pending:
print(f"Still {len(pending)} tasks pending")
# I tried putting a stop() - with/without a run_forever()
# after the for - same exception raised.
# loop.stop()
for future in pending:
future.cancel()
for task in done:
res = task.result()
print("Task returned", res)
except ValueError as error:
print("Outer except --", error)
finally:
# I also tried placing the run_forever() here,
# before the stop() - no dice.
loop.stop()
if pending:
print("Waiting for pending futures to finish...")
loop.run_forever()
loop.close()
I tried several variants of the stop()
and run_forever()
calls, the "run_forever first, then stop" seems to be the one to use according to the pydoc and, without the call to close()
yields a satisfying:
Coro 0 done
Coro 1 done
Still 2 tasks pending
Task returned 1
Task returned 0
Outer except -- don't like 2
Waiting for pending futures to finish...
man, I was canceled: 4
man, I was canceled: 3
Process finished with exit code 0
However, when the call to close()
is added (as shown above) I get two exceptions:
exception calling callback for <Future at 0x104f21438 state=finished returned int>
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
callback(self)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
dest_loop.call_soon_threadsafe(_set_state, destination, source)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
self._check_closed()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
which is at best annoying, but to me, totally puzzling: and, to make matter worse, I've been unable to figure out what would The Right Way of handling such a situation.
Thus, two questions:
what am I missing? how should I modify the code above in a way that with the call to close()
included does not raise?
what actually happens if I don't call close()
- in this trivial case, I presume it's largely redundant; but what might the consequences be in a "real" production code?
For my own personal satisfaction, also:
Many thanks in advance for any suggestions you may have!
The thing to remember is that you must be diligent if you want something done. That requires you to constantly communicate your desired results and close the loop to make sure it gets done. Like anything, closed-loop communications takes some practice.
Learn more about call loops here. A call loop occurs when call forwarding rules are set up in a way that allows a single call to go back to a destination infinitely, without allowing the call to reach a voicemail box.
Since the loop is infinite, so are the logs. The means it is possible to crash your PBX if a call gets forwarded too many times. Also, when a loop is in place, additional calls that hit the loop often receive a fast busy signal when dialed, preventing you from answering that call.
When to Use Closed-loop Control Instead of Open-loop Control Control operations can be either closed loop or open loop. Before a closed-loop control system can be designed or tuned, all parts must be in good working order, so sticking valves, gear backlash, and erratic sensors, for example, must be fixed.
Distilled to its simplest, my use case is a bunch of blocking "old school" calls, which I wrap in the
run_in_executor()
and an outer coroutine; if any of those calls goes wrong, I want to stop progress, cancel the ones still outstanding
This can't work as envisioned because run_in_executor
submits the function to a thread pool, and OS threads can't be cancelled in Python (or in other languages that expose them). Canceling the future returned by run_in_executor
will attempt to cancel the underlying concurrent.futures.Future
, but that will only have effect if the blocking function is not yet running, e.g. because the thread pool is busy. Once it starts to execute, it cannot be safely cancelled. Support for safe and reliable cancellation is one of the benefits of using asyncio
compared to threads.
If you are dealing with synchronous code, be it a legacy blocking call or longer-running CPU-bound code, you should run it with run_in_executor
and incorporate a way to interrupt it. For example, the code could occasionally check a stop_requested
flag and exit if that is true, perhaps by raising an exception. Then you can "cancel" those tasks by setting the appropriate flag or flags.
how should I modify the code above in a way that with the call to close() included does not raise?
As far as I can tell, there is currently no way to do so without modifications to blocking
and the top-level code. run_in_executor
will insist on informing the event loop of the result, and this fails when the event loop is closed. It doesn't help that the asyncio future is cancelled, because the cancellation check is performed in the event loop thread, and the error occurs before that, when call_soon_threadsafe
is called by the worker thread. (It might be possible to move the check to the worker thread, but it should be carefully analyzed whether it leads a race condition between the call to cancel()
and the actual check.)
why does it raise at all? what more does the loop want from the coros/tasks: they either exited; raised; or were canceled: isn't this enough to keep it happy?
It wants the blocking functions passed to run_in_executor
(literally called blocking
in the question) that have already been started to finish running before the event loop is closed. You cancelled the asyncio future, but the underlying concurrent future still wants to "phone home", finding the loop closed.
It is not obvious whether this is a bug in asyncio, or if you are simply not supposed to close an event loop until you somehow ensure that all work submitted to run_in_executor
is done. Doing so requires the following changes:
wait()
for those futures, as asyncio will consider them complete.loop.run_until_complete(asyncio.wait(pending))
before loop.close()
.With these modifications (except for the application-specific event - I simply let the sleep()
s finish their course), the exception did not appear.
what actually happens if I don't call
close()
- in this trivial case, I presume it's largely redundant; but what might the consequences be in a "real" production code?
Since a typical event loop runs as long as the application, there should be no issue in not call close()
at the very end of the program. The operating system will clean up the resources on program exit anyway.
Calling loop.close()
is important for event loops that have a clear lifetime. For example, a library might create a fresh event loop for a specific task, run it in a dedicated thread, and dispose of it. Failing to close such a loop could leak its internal resources (such as the pipe it uses for inter-thread wakeup) and cause the program to fail. Another example are test suites, which often start a new event loop for each unit test to ensure separation of test environments.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With