I'm confused about how to use asyncio.Queue
for a particular producer-consumer pattern in which both the producer and consumer operate concurrently and independently.
First, consider this example, which closely follows that from the docs for asyncio.Queue
:
import asyncio import random import time async def worker(name, queue): while True: sleep_for = await queue.get() await asyncio.sleep(sleep_for) queue.task_done() print(f'{name} has slept for {sleep_for:0.2f} seconds') async def main(n): queue = asyncio.Queue() total_sleep_time = 0 for _ in range(20): sleep_for = random.uniform(0.05, 1.0) total_sleep_time += sleep_for queue.put_nowait(sleep_for) tasks = [] for i in range(n): task = asyncio.create_task(worker(f'worker-{i}', queue)) tasks.append(task) started_at = time.monotonic() await queue.join() total_slept_for = time.monotonic() - started_at for task in tasks: task.cancel() # Wait until all worker tasks are cancelled. await asyncio.gather(*tasks, return_exceptions=True) print('====') print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') print(f'total expected sleep time: {total_sleep_time:.2f} seconds') if __name__ == '__main__': import sys n = 3 if len(sys.argv) == 1 else sys.argv[1] asyncio.run(main())
There is one finer detail about this script: the items are put into the queue synchronously, with queue.put_nowait(sleep_for)
over a conventional for-loop.
My goal is to create a script that uses async def worker()
(or consumer()
) and async def producer()
. Both should be scheduled to run concurrently. No one consumer coroutine is explicitly tied to or chained from a producer.
How can I modify the program above so that the producer(s) is its own coroutine that can be scheduled concurrently with the consumers/workers?
There is a second example from PYMOTW. It requires the producer to know the number of consumers ahead of time, and uses None
as a signal to the consumer that production is done.
Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code. Note that methods of asyncio queues don't have a timeout parameter; use asyncio. wait_for() function to do queue operations with a timeout. See also the Examples section below.
It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.
The intended use of asyncio tasks is to allow independently running tasks to run 'concurrently' with other tasks within the same event loop.
How can I modify the program above so that the producer(s) is its own coroutine that can be scheduled concurrently with the consumers/workers?
The example can be generalized without changing its essential logic:
await producer()
or await gather(*producers)
, etc.await queue.join()
.Here is an example implementing the above:
import asyncio, random async def rnd_sleep(t): # sleep for T seconds on average await asyncio.sleep(t * random.random() * 2) async def producer(queue): while True: # produce a token and send it to a consumer token = random.random() print(f'produced {token}') if token < .05: break await queue.put(token) await rnd_sleep(.1) async def consumer(queue): while True: token = await queue.get() # process the token received from a producer await rnd_sleep(.3) queue.task_done() print(f'consumed {token}') async def main(): queue = asyncio.Queue() # fire up the both producers and consumers producers = [asyncio.create_task(producer(queue)) for _ in range(3)] consumers = [asyncio.create_task(consumer(queue)) for _ in range(10)] # with both producers and consumers running, wait for # the producers to finish await asyncio.gather(*producers) print('---- done producing') # wait for the remaining tasks to be processed await queue.join() # cancel the consumers, which are now idle for c in consumers: c.cancel() asyncio.run(main())
Note that in real-life producers and consumers, especially those that involve network access, you probably want to catch IO-related exceptions that occur during processing. If the exception is recoverable, as most network-related exceptions are, you can simply catch the exception and log the error. You should still invoke task_done()
because otherwise queue.join()
will hang due to an unprocessed item. If it makes sense to re-try processing the item, you can return it into the queue prior to calling task_done()
. For example:
# like the above, but handling exceptions during processing: async def consumer(queue): while True: token = await queue.get() try: # this uses aiohttp or whatever await process(token) except aiohttp.ClientError as e: print(f"Error processing token {token}: {e}") # If it makes sense, return the token to the queue to be # processed again. (You can use a counter to avoid # processing a faulty token infinitely.) #await queue.put(token) queue.task_done() print(f'consumed {token}')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With