Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using asyncio.Queue for producer-consumer flow

I'm confused about how to use asyncio.Queue for a particular producer-consumer pattern in which both the producer and consumer operate concurrently and independently.

First, consider this example, which closely follows that from the docs for asyncio.Queue:

import asyncio import random import time  async def worker(name, queue):     while True:         sleep_for = await queue.get()         await asyncio.sleep(sleep_for)         queue.task_done()         print(f'{name} has slept for {sleep_for:0.2f} seconds')  async def main(n):     queue = asyncio.Queue()     total_sleep_time = 0     for _ in range(20):         sleep_for = random.uniform(0.05, 1.0)         total_sleep_time += sleep_for         queue.put_nowait(sleep_for)     tasks = []     for i in range(n):         task = asyncio.create_task(worker(f'worker-{i}', queue))         tasks.append(task)     started_at = time.monotonic()     await queue.join()     total_slept_for = time.monotonic() - started_at     for task in tasks:         task.cancel()     # Wait until all worker tasks are cancelled.     await asyncio.gather(*tasks, return_exceptions=True)     print('====')     print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')     print(f'total expected sleep time: {total_sleep_time:.2f} seconds')  if __name__ == '__main__':     import sys     n = 3 if len(sys.argv) == 1 else sys.argv[1]     asyncio.run(main()) 

There is one finer detail about this script: the items are put into the queue synchronously, with queue.put_nowait(sleep_for) over a conventional for-loop.

My goal is to create a script that uses async def worker() (or consumer()) and async def producer(). Both should be scheduled to run concurrently. No one consumer coroutine is explicitly tied to or chained from a producer.

How can I modify the program above so that the producer(s) is its own coroutine that can be scheduled concurrently with the consumers/workers?


There is a second example from PYMOTW. It requires the producer to know the number of consumers ahead of time, and uses None as a signal to the consumer that production is done.

like image 642
Brad Solomon Avatar asked Sep 30 '18 22:09

Brad Solomon


People also ask

Is Asyncio queue thread-safe?

Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code. Note that methods of asyncio queues don't have a timeout parameter; use asyncio. wait_for() function to do queue operations with a timeout. See also the Examples section below.

How many times should Asyncio run () be called?

It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.

Does Asyncio run concurrently?

The intended use of asyncio tasks is to allow independently running tasks to run 'concurrently' with other tasks within the same event loop.


1 Answers

How can I modify the program above so that the producer(s) is its own coroutine that can be scheduled concurrently with the consumers/workers?

The example can be generalized without changing its essential logic:

  • Move the insertion loop to a separate producer coroutine.
  • Start the consumers in the background, letting them process the items as they are produced.
  • With the consumers running, start the producers and wait for them to finish producing items, as with await producer() or await gather(*producers), etc.
  • Once all producers are done, wait for consumers to process the remaining items with await queue.join().
  • Cancel the consumers, all of which are now idly waiting for the queue to deliver the next item, which will never arrive as we know the producers are done.

Here is an example implementing the above:

import asyncio, random   async def rnd_sleep(t):     # sleep for T seconds on average     await asyncio.sleep(t * random.random() * 2)   async def producer(queue):     while True:         # produce a token and send it to a consumer         token = random.random()         print(f'produced {token}')         if token < .05:             break         await queue.put(token)         await rnd_sleep(.1)   async def consumer(queue):     while True:         token = await queue.get()         # process the token received from a producer         await rnd_sleep(.3)         queue.task_done()         print(f'consumed {token}')   async def main():     queue = asyncio.Queue()       # fire up the both producers and consumers     producers = [asyncio.create_task(producer(queue))                  for _ in range(3)]     consumers = [asyncio.create_task(consumer(queue))                  for _ in range(10)]       # with both producers and consumers running, wait for     # the producers to finish     await asyncio.gather(*producers)     print('---- done producing')       # wait for the remaining tasks to be processed     await queue.join()       # cancel the consumers, which are now idle     for c in consumers:         c.cancel()   asyncio.run(main()) 

Note that in real-life producers and consumers, especially those that involve network access, you probably want to catch IO-related exceptions that occur during processing. If the exception is recoverable, as most network-related exceptions are, you can simply catch the exception and log the error. You should still invoke task_done() because otherwise queue.join() will hang due to an unprocessed item. If it makes sense to re-try processing the item, you can return it into the queue prior to calling task_done(). For example:

# like the above, but handling exceptions during processing: async def consumer(queue):     while True:         token = await queue.get()         try:             # this uses aiohttp or whatever             await process(token)         except aiohttp.ClientError as e:             print(f"Error processing token {token}: {e}")             # If it makes sense, return the token to the queue to be             # processed again. (You can use a counter to avoid             # processing a faulty token infinitely.)             #await queue.put(token)         queue.task_done()         print(f'consumed {token}') 
like image 163
user4815162342 Avatar answered Oct 04 '22 09:10

user4815162342