Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementing select-like functionality with asyncio

I am experiencing a behaviour that I don't understand while trying to implement a select()-like functionality with asyncio (on Python 3.6):

  • I have 2 queues (A & B) where messages are Queue.put by async producers
  • I have an async consumer (the selector) that polls the queues and takes the first message available (ie. a select()-like functionality using asyncio.wait_for.

The polling works like this:

poll = list(_.get() for _ in queues)
while True:
    done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)

Then, iterating on the done futures, I replace the corresponding entries in poll with a fresh Queue.get:

    for f in done:
        try:
            i = poll.index(f._coro)
            ...
            poll[i] = queues[i].get()

Now, the odd behaviour that I experience is that the selection works, but some of the done futures .result() return None, and the messages sent through the queues are lost:

PUT B ('B', 0)/0
GET B ('B', 0)/0
PUT A ('A', 0)/0
GET A None/0         << ('A', 0) is never received
PUT A ('A', 1)/0
GET A ('A', 1)/0
PUT B ('B', 1)/0
GET B None/0
PUT A ('A', 2)/0
GET A None/0
PUT B ('B', 2)/0
GET B None/0

Here's the code

#!/usr/bin/env python3
import asyncio, random

async def queue_generator( name, queue, speed=1 ):
    counter = 0
    while True:
        t = (random.random() + 0.5) * speed
        await asyncio.sleep(t)
        m = (name, counter)
        print ("PUT {0} {1}/{2:d}".format(name, m, queue.qsize()))
        queue.put_nowait(m)
        counter += 1

async def select( *queues ):
    poll = list(_.get() for _ in queues)
    while True:
        # That's the select()-like functionalit
        done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)
        for f in done:
            i = poll.index(f._coro)
            # That's where sometimes v is None
            v = f.result()
            print ("GET {0} {1}/{2}".format("AB"[i], v, queues[i].qsize()))
            poll[i] = queues[i].get()
        await asyncio.sleep(0.5)

if __name__ == "__main__":
    loop     = asyncio.get_event_loop()
    queue_a  = asyncio.Queue(10)
    queue_b  = asyncio.Queue(10)
    tasks = (
        loop.create_task(queue_generator("A", queue_a, 3)),
        loop.create_task(queue_generator("B", queue_b, 3)),
        loop.create_task(select(queue_a, queue_b))
    )
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

As this happens whenever there is a change in the queue that currently has a message, I assume the problem lies in doing something with the pending futures. In fact, adding

    for f in pending:
        f.cancel()

solves the issue, but I'd like to re-use these futures as much as I can. I suppose the problem comes from the fact that asyncio.wait_for silently transforms the list of generators into tasks.

like image 488
Sébastien Pierre Avatar asked Mar 21 '26 00:03

Sébastien Pierre


1 Answers

I didn't dig deeply into what happens, but moving poll creating inside loop seems to fix things:

async def select( *queues ):
    while True:    
        poll = list(_.get() for _ in queues)  # HERE    
        # That's the select()-like functionalit
        done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)

poll is a list of coroutines. Each unique coroutine usually should be awaited once. Doing otherwise may lead to strange things.

like image 139
Mikhail Gerasimov Avatar answered Mar 22 '26 14:03

Mikhail Gerasimov



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!