Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What's the canonical approach to creating multiple producers and consumers with coroutines in python asyncio?

I am trying to understand Python asyncio. I have something that I think works but I don't think it's a canonical approach to coroutine programming.

I essentially want to create arbitrary graphs of processing such as the following:

consumer producer coroutines

How do I create a coroutine that can receive data from multiple coroutines? Is it asend?

How do I create a mesh where every producer sends to every consumer?

The code below creates the following graph: each producer has multiple consumers

class Producer:
   
  async def producer(self):
    yield "started producer"
    for i in range(0, 100):
      for connection in self.connections:
        
        await connection.asend(i)
        

class Consumer:
    
  async def consumer(self):
    yield "started consumer"
    while True:
      for connection in self.producers:
        item = yield 0
        print("item", item)



async def do():
  producers = []
  consumers = []
  for i in range(0, 10):
    producers.append(Producer())
  for i in range(0, 10):
    consumers.append(Consumer())
  
  prodawait = []
  consumeawait = []
  for b in consumers:
    cons = b.consumer()
    await cons.__anext__()
    consumeawait.append(cons)
  for a in producers:
    prod = a.producer()
    prodawait.append(prod)
    await prod.__anext__()

    
  for producer in producers:
    producer.connections = consumeawait
  for consumer in consumers:
    consumer.producers = prodawait
  

  while True:
    for consumer in consumeawait:
      
      for item in prodawait:
        await item.__anext__()
      await consumer.__anext__()

import asyncio
asyncio.run(do())
like image 459
Samuel Squire Avatar asked Oct 21 '25 11:10

Samuel Squire


1 Answers

I think you can look at asyncio.Queue how to distribute the workload between producers-consumers.

This example will create 5 producers and 3 consumers, connected by queue:

import random
import asyncio


async def producer(n, q):
    for i in range(3):
        await asyncio.sleep(random.randint(1, 5))
        q.put_nowait(f"Producer {n} put item {i}")


async def consumer(n, q):
    while True:
        data = await q.get()
        await asyncio.sleep(random.randint(1, 5))
        print(f"Worker {n}: Processed {data=} from queue.")
        q.task_done()


async def main():
    q = asyncio.Queue()

    producers = [asyncio.create_task(producer(n, q)) for n in range(5)]
    consumers = [asyncio.create_task(consumer(n, q)) for n in range(3)]

    # wait until all producers are finished
    await asyncio.gather(*producers)

    # wait until queue is empty
    await q.join()

    # cancel consumer tasks
    for task in consumers:
        task.cancel()

    await asyncio.gather(*consumers, return_exceptions=True)

    print("Done!")

Prints (for example):

Worker 0: Processed data='Producer 1 put item 0' from queue.
Worker 1: Processed data='Producer 2 put item 0' from queue.
Worker 1: Processed data='Producer 3 put item 0' from queue.
Worker 2: Processed data='Producer 4 put item 0' from queue.
Worker 0: Processed data='Producer 0 put item 0' from queue.
Worker 0: Processed data='Producer 0 put item 1' from queue.
Worker 2: Processed data='Producer 3 put item 1' from queue.
Worker 1: Processed data='Producer 1 put item 1' from queue.
Worker 0: Processed data='Producer 4 put item 1' from queue.
Worker 2: Processed data='Producer 1 put item 2' from queue.
Worker 1: Processed data='Producer 2 put item 1' from queue.
Worker 0: Processed data='Producer 0 put item 2' from queue.
Worker 2: Processed data='Producer 3 put item 2' from queue.
Worker 0: Processed data='Producer 2 put item 2' from queue.
Worker 1: Processed data='Producer 4 put item 2' from queue.
Done!
like image 131
Andrej Kesely Avatar answered Oct 22 '25 23:10

Andrej Kesely