Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to await a select.select call in python asyncio

I have a python 3.6 program where I am using the asyncio package event loops. One of my data sources comes from an api which was not build around asyncio. My connection object contains a member called _connection which is just a python socket. Right now I can use this in a select statement to tell when data is ready.

async def run(self):
    while True:
        if select.select([self._q._connection], [], [])[0]:
            msg = self._q.receive()
            print(msg)

What I would really like is...

async def run(self):
    while True:
        if await select.select([self._q._connection], [], [])[0]:
            msg = self._q.receive()
            print(msg)

I know there is a sock_recv function in the asyncio event loop however I need the api to do the actual reading and decoding. I tried this but it would just fall through the await which I guess makes sense since I said 0 bytes.

async def run(self):
    while True:
        print('A')
        await asyncio.get_event_loop().sock_recv(self._q._connection, 0)
        print('B')
        msg = self._q.receive()
        print(msg)

The only solution I can think of for now is to add a small timeout to the select and then call asyncio.sleep while there is no data but this seems like an inefficent approach. I wish there was something like asyncio.select. Do anyone want to recommend another approach?

EDIT: Right now I have come up with this. I don't like it because it adds an extra quarter second latency (probably doesn't matter much for my application but it still bugs me.)

async def run(self):
    while True:
        if select.select([self._q._connection], [], [], 0)[0]:
           print(self._q.receive())
        else:
            await asyncio.sleep(0.25)
like image 416
chasep255 Avatar asked Jan 14 '18 10:01

chasep255


1 Answers

You could use loop.add_reader to wait for the read availability of your socket:

async def watch(fd):
    future = asyncio.Future()
    loop.add_reader(fd, future.set_result, None)
    future.add_done_callback(lambda f: loop.remove_reader(fd))
    await future

async def run(self):
    while True:
        await watch(self._q._connection)
        msg = self._q.receive()
        print(msg)

However, it'll be very tricky to avoid all the blocking IO calls of the library you mentioned without rewriting it completely. Instead, I'd recommend to use the loop.run_in_executor method to schedule the blocking IO calls in a thread pool:

async def run(self):
    loop = asyncio.get_event_loop()
    while True:
        msg = await loop.run_in_executor(None, self._q.receive)
        print(msg)
like image 74
Vincent Avatar answered Sep 29 '22 04:09

Vincent