I have a python 3.6 program where I am using the asyncio package event loops.  One of my data sources comes from an api which was not build around asyncio.  My connection object contains a member called _connection which is just a python socket.  Right now I can use this in a select statement to tell when data is ready.
async def run(self):
    while True:
        if select.select([self._q._connection], [], [])[0]:
            msg = self._q.receive()
            print(msg)
What I would really like is...
async def run(self):
    while True:
        if await select.select([self._q._connection], [], [])[0]:
            msg = self._q.receive()
            print(msg)
I know there is a sock_recv function in the asyncio event loop however I need the api to do the actual reading and decoding.  I tried this but it would just fall through the await which I guess makes sense since I said 0 bytes.
async def run(self):
    while True:
        print('A')
        await asyncio.get_event_loop().sock_recv(self._q._connection, 0)
        print('B')
        msg = self._q.receive()
        print(msg)
The only solution I can think of for now is to add a small timeout to the select and then call asyncio.sleep while there is no data but this seems like an inefficent approach.  I wish there was something like asyncio.select.  Do anyone want to recommend another approach?
EDIT: Right now I have come up with this. I don't like it because it adds an extra quarter second latency (probably doesn't matter much for my application but it still bugs me.)
async def run(self):
    while True:
        if select.select([self._q._connection], [], [], 0)[0]:
           print(self._q.receive())
        else:
            await asyncio.sleep(0.25)
                You could use loop.add_reader to wait for the read availability of your socket:
async def watch(fd):
    future = asyncio.Future()
    loop.add_reader(fd, future.set_result, None)
    future.add_done_callback(lambda f: loop.remove_reader(fd))
    await future
async def run(self):
    while True:
        await watch(self._q._connection)
        msg = self._q.receive()
        print(msg)
However, it'll be very tricky to avoid all the blocking IO calls of the library you mentioned without rewriting it completely. Instead, I'd recommend to use the loop.run_in_executor method to schedule the blocking IO calls in a thread pool:
async def run(self):
    loop = asyncio.get_event_loop()
    while True:
        msg = await loop.run_in_executor(None, self._q.receive)
        print(msg)
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With