I am trying to understand Python asyncio. I have something that I think works but I don't think it's a canonical approach to coroutine programming.
I essentially want to create arbitrary graphs of processing such as the following:

How do I create a coroutine that can receive data from multiple coroutines? Is it asend?
How do I create a mesh where every producer sends to every consumer?
The code below creates the following graph:

class Producer:
   
  async def producer(self):
    yield "started producer"
    for i in range(0, 100):
      for connection in self.connections:
        
        await connection.asend(i)
        
class Consumer:
    
  async def consumer(self):
    yield "started consumer"
    while True:
      for connection in self.producers:
        item = yield 0
        print("item", item)
async def do():
  producers = []
  consumers = []
  for i in range(0, 10):
    producers.append(Producer())
  for i in range(0, 10):
    consumers.append(Consumer())
  
  prodawait = []
  consumeawait = []
  for b in consumers:
    cons = b.consumer()
    await cons.__anext__()
    consumeawait.append(cons)
  for a in producers:
    prod = a.producer()
    prodawait.append(prod)
    await prod.__anext__()
    
  for producer in producers:
    producer.connections = consumeawait
  for consumer in consumers:
    consumer.producers = prodawait
  
  while True:
    for consumer in consumeawait:
      
      for item in prodawait:
        await item.__anext__()
      await consumer.__anext__()
import asyncio
asyncio.run(do())
I think you can look at asyncio.Queue how to distribute the workload between producers-consumers.
This example will create 5 producers and 3 consumers, connected by queue:
import random
import asyncio
async def producer(n, q):
    for i in range(3):
        await asyncio.sleep(random.randint(1, 5))
        q.put_nowait(f"Producer {n} put item {i}")
async def consumer(n, q):
    while True:
        data = await q.get()
        await asyncio.sleep(random.randint(1, 5))
        print(f"Worker {n}: Processed {data=} from queue.")
        q.task_done()
async def main():
    q = asyncio.Queue()
    producers = [asyncio.create_task(producer(n, q)) for n in range(5)]
    consumers = [asyncio.create_task(consumer(n, q)) for n in range(3)]
    # wait until all producers are finished
    await asyncio.gather(*producers)
    # wait until queue is empty
    await q.join()
    # cancel consumer tasks
    for task in consumers:
        task.cancel()
    await asyncio.gather(*consumers, return_exceptions=True)
    print("Done!")
Prints (for example):
Worker 0: Processed data='Producer 1 put item 0' from queue.
Worker 1: Processed data='Producer 2 put item 0' from queue.
Worker 1: Processed data='Producer 3 put item 0' from queue.
Worker 2: Processed data='Producer 4 put item 0' from queue.
Worker 0: Processed data='Producer 0 put item 0' from queue.
Worker 0: Processed data='Producer 0 put item 1' from queue.
Worker 2: Processed data='Producer 3 put item 1' from queue.
Worker 1: Processed data='Producer 1 put item 1' from queue.
Worker 0: Processed data='Producer 4 put item 1' from queue.
Worker 2: Processed data='Producer 1 put item 2' from queue.
Worker 1: Processed data='Producer 2 put item 1' from queue.
Worker 0: Processed data='Producer 0 put item 2' from queue.
Worker 2: Processed data='Producer 3 put item 2' from queue.
Worker 0: Processed data='Producer 2 put item 2' from queue.
Worker 1: Processed data='Producer 4 put item 2' from queue.
Done!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With