Let's say I have an async generator like this:
async def event_publisher(connection, queue):
while True:
if not await connection.is_disconnected():
event = await queue.get()
yield event
else:
return
I consume it like this:
published_events = event_publisher(connection, queue)
async for event in published_events:
# do event processing here
It works just fine, however when the connection is disconnected and there is no new event published the async for
will just wait forever, so ideally I would like to close the generator forcefully like this:
if connection.is_disconnected():
await published_events.aclose()
But I get the following error:
RuntimeError: aclose(): asynchronous generator is already running
Is there a way to stop processing of an already running generator?
It seems to be related to this issue. Noticable:
As shown in https://gist.github.com/1st1/d9860cbf6fe2e5d243e695809aea674c, it's an error to close a synchronous generator while it is being iterated.
...
In 3.8, calling "aclose()" can crash with a RuntimeError. It's no longer possible to reliably cancel a running asynchrounous generator.
Well, since we can't cancel running asynchrounous generator, let's try to cancel its running.
import asyncio
from contextlib import suppress
async def cancel_gen(agen):
task = asyncio.create_task(agen.__anext__())
task.cancel()
with suppress(asyncio.CancelledError):
await task
await agen.aclose() # probably a good idea,
# but if you'll be getting errors, try to comment this line
...
if connection.is_disconnected():
await cancel_gen(published_events)
Can't test if it'll work since you didn't provide reproducable example.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With