Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python asyncio queue not showing any exceptions

  1. If i run this code, it will hang without throwing ZeroDivisionError.
  2. If i move await asyncio.gather(*tasks, return_exceptions=True) above await queue.join(), it will finally throw ZeroDivisionError and stop.
  3. If i then comment out 1 / 0 and run, it will execute everything, but will hang in the end.

Now the question is, how can i achive both:

  1. Being able to see unexpected exceptions as in the case 2 above, and...
  2. Actually stop when all task are done in the Queue

.

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        print('Get a "work item" out of the queue.')
        sleep_for = await queue.get()

        print('Sleep for the "sleep_for" seconds.')
        await asyncio.sleep(sleep_for)

        # Error on purpose
        1 / 0

        print('Notify the queue that the "work item" has been processed.')
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')

async def main():
    print('Create a queue that we will use to store our "workload".')
    queue = asyncio.Queue()

    print('Generate random timings and put them into the queue.')
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    print('Create three worker tasks to process the queue concurrently.')
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    print('Wait until the queue is fully processed.')
    started_at = time.monotonic()

    print('Joining queue')
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    print('Cancel our worker tasks.')
    for task in tasks:
        task.cancel()

    print('Wait until all worker tasks are cancelled.')
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

asyncio.run(main())
like image 344
Superbman Avatar asked Mar 16 '20 00:03

Superbman


Video Answer


1 Answers

There are several ways to approach this, but the central idea is that in asyncio, unlike in classic threading, it is straightforward to await multiple things at once.

For example, you can await queue.join() and the worker tasks, whichever completes first. Since workers don't complete normally (you cancel them later), a worker completing means that it has raised.

# convert queue.join() to a full-fledged task, so we can test
# whether it's done
queue_complete = asyncio.create_task(queue.join())

# wait for the queue to complete or one of the workers to exit
await asyncio.wait([queue_complete, *tasks], return_when=asyncio.FIRST_COMPLETED)

if not queue_complete.done():
    # If the queue hasn't completed, it means one of the workers has
    # raised - find it and propagate the exception.  You can also
    # use t.exception() to get the exception object. Canceling other
    # tasks is another possibility.
    for t in tasks:
        if t.done():
            t.result()  # this will raise
like image 200
user4815162342 Avatar answered Nov 15 '22 06:11

user4815162342