ZeroDivisionError
. await asyncio.gather(*tasks, return_exceptions=True)
above await queue.join()
, it will finally throw ZeroDivisionError
and stop.1 / 0
and run, it will execute everything, but will hang in the end.Now the question is, how can i achive both:
.
import asyncio
import random
import time
async def worker(name, queue):
while True:
print('Get a "work item" out of the queue.')
sleep_for = await queue.get()
print('Sleep for the "sleep_for" seconds.')
await asyncio.sleep(sleep_for)
# Error on purpose
1 / 0
print('Notify the queue that the "work item" has been processed.')
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
print('Create a queue that we will use to store our "workload".')
queue = asyncio.Queue()
print('Generate random timings and put them into the queue.')
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
print('Create three worker tasks to process the queue concurrently.')
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
print('Wait until the queue is fully processed.')
started_at = time.monotonic()
print('Joining queue')
await queue.join()
total_slept_for = time.monotonic() - started_at
print('Cancel our worker tasks.')
for task in tasks:
task.cancel()
print('Wait until all worker tasks are cancelled.')
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
There are several ways to approach this, but the central idea is that in asyncio, unlike in classic threading, it is straightforward to await multiple things at once.
For example, you can await queue.join()
and the worker tasks, whichever completes first. Since workers don't complete normally (you cancel them later), a worker completing means that it has raised.
# convert queue.join() to a full-fledged task, so we can test
# whether it's done
queue_complete = asyncio.create_task(queue.join())
# wait for the queue to complete or one of the workers to exit
await asyncio.wait([queue_complete, *tasks], return_when=asyncio.FIRST_COMPLETED)
if not queue_complete.done():
# If the queue hasn't completed, it means one of the workers has
# raised - find it and propagate the exception. You can also
# use t.exception() to get the exception object. Canceling other
# tasks is another possibility.
for t in tasks:
if t.done():
t.result() # this will raise
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With