I am trying to understand Python asyncio. I have something that I think works but I don't think it's a canonical approach to coroutine programming.
I essentially want to create arbitrary graphs of processing such as the following:

How do I create a coroutine that can receive data from multiple coroutines? Is it asend?
How do I create a mesh where every producer sends to every consumer?
The code below creates the following graph:

class Producer:
async def producer(self):
yield "started producer"
for i in range(0, 100):
for connection in self.connections:
await connection.asend(i)
class Consumer:
async def consumer(self):
yield "started consumer"
while True:
for connection in self.producers:
item = yield 0
print("item", item)
async def do():
producers = []
consumers = []
for i in range(0, 10):
producers.append(Producer())
for i in range(0, 10):
consumers.append(Consumer())
prodawait = []
consumeawait = []
for b in consumers:
cons = b.consumer()
await cons.__anext__()
consumeawait.append(cons)
for a in producers:
prod = a.producer()
prodawait.append(prod)
await prod.__anext__()
for producer in producers:
producer.connections = consumeawait
for consumer in consumers:
consumer.producers = prodawait
while True:
for consumer in consumeawait:
for item in prodawait:
await item.__anext__()
await consumer.__anext__()
import asyncio
asyncio.run(do())
I think you can look at asyncio.Queue how to distribute the workload between producers-consumers.
This example will create 5 producers and 3 consumers, connected by queue:
import random
import asyncio
async def producer(n, q):
for i in range(3):
await asyncio.sleep(random.randint(1, 5))
q.put_nowait(f"Producer {n} put item {i}")
async def consumer(n, q):
while True:
data = await q.get()
await asyncio.sleep(random.randint(1, 5))
print(f"Worker {n}: Processed {data=} from queue.")
q.task_done()
async def main():
q = asyncio.Queue()
producers = [asyncio.create_task(producer(n, q)) for n in range(5)]
consumers = [asyncio.create_task(consumer(n, q)) for n in range(3)]
# wait until all producers are finished
await asyncio.gather(*producers)
# wait until queue is empty
await q.join()
# cancel consumer tasks
for task in consumers:
task.cancel()
await asyncio.gather(*consumers, return_exceptions=True)
print("Done!")
Prints (for example):
Worker 0: Processed data='Producer 1 put item 0' from queue.
Worker 1: Processed data='Producer 2 put item 0' from queue.
Worker 1: Processed data='Producer 3 put item 0' from queue.
Worker 2: Processed data='Producer 4 put item 0' from queue.
Worker 0: Processed data='Producer 0 put item 0' from queue.
Worker 0: Processed data='Producer 0 put item 1' from queue.
Worker 2: Processed data='Producer 3 put item 1' from queue.
Worker 1: Processed data='Producer 1 put item 1' from queue.
Worker 0: Processed data='Producer 4 put item 1' from queue.
Worker 2: Processed data='Producer 1 put item 2' from queue.
Worker 1: Processed data='Producer 2 put item 1' from queue.
Worker 0: Processed data='Producer 0 put item 2' from queue.
Worker 2: Processed data='Producer 3 put item 2' from queue.
Worker 0: Processed data='Producer 2 put item 2' from queue.
Worker 1: Processed data='Producer 4 put item 2' from queue.
Done!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With