I am experiencing a behaviour that I don't understand while trying to implement a select()-like functionality with asyncio (on Python 3.6):
Queue.put by async producersselect()-like functionality using asyncio.wait_for.The polling works like this:
poll = list(_.get() for _ in queues)
while True:
done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)
Then, iterating on the done futures, I replace the corresponding entries in poll with a fresh Queue.get:
for f in done:
try:
i = poll.index(f._coro)
...
poll[i] = queues[i].get()
Now, the odd behaviour that I experience is that the selection works, but some of the done futures .result() return None, and the messages sent through the queues are lost:
PUT B ('B', 0)/0
GET B ('B', 0)/0
PUT A ('A', 0)/0
GET A None/0 << ('A', 0) is never received
PUT A ('A', 1)/0
GET A ('A', 1)/0
PUT B ('B', 1)/0
GET B None/0
PUT A ('A', 2)/0
GET A None/0
PUT B ('B', 2)/0
GET B None/0
Here's the code
#!/usr/bin/env python3
import asyncio, random
async def queue_generator( name, queue, speed=1 ):
counter = 0
while True:
t = (random.random() + 0.5) * speed
await asyncio.sleep(t)
m = (name, counter)
print ("PUT {0} {1}/{2:d}".format(name, m, queue.qsize()))
queue.put_nowait(m)
counter += 1
async def select( *queues ):
poll = list(_.get() for _ in queues)
while True:
# That's the select()-like functionalit
done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)
for f in done:
i = poll.index(f._coro)
# That's where sometimes v is None
v = f.result()
print ("GET {0} {1}/{2}".format("AB"[i], v, queues[i].qsize()))
poll[i] = queues[i].get()
await asyncio.sleep(0.5)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue_a = asyncio.Queue(10)
queue_b = asyncio.Queue(10)
tasks = (
loop.create_task(queue_generator("A", queue_a, 3)),
loop.create_task(queue_generator("B", queue_b, 3)),
loop.create_task(select(queue_a, queue_b))
)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
As this happens whenever there is a change in the queue that currently has a message, I assume the problem lies in doing something with the pending futures. In fact, adding
for f in pending:
f.cancel()
solves the issue, but I'd like to re-use these futures as much as I can. I suppose the problem comes from the fact that asyncio.wait_for silently transforms the list of generators into tasks.
I didn't dig deeply into what happens, but moving poll creating inside loop seems to fix things:
async def select( *queues ):
while True:
poll = list(_.get() for _ in queues) # HERE
# That's the select()-like functionalit
done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)
poll is a list of coroutines. Each unique coroutine usually should be awaited once. Doing otherwise may lead to strange things.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With