I have a python 3.6 program where I am using the asyncio package event loops. One of my data sources comes from an api which was not build around asyncio. My connection object contains a member called _connection
which is just a python socket. Right now I can use this in a select statement to tell when data is ready.
async def run(self):
while True:
if select.select([self._q._connection], [], [])[0]:
msg = self._q.receive()
print(msg)
What I would really like is...
async def run(self):
while True:
if await select.select([self._q._connection], [], [])[0]:
msg = self._q.receive()
print(msg)
I know there is a sock_recv
function in the asyncio event loop however I need the api to do the actual reading and decoding. I tried this but it would just fall through the await which I guess makes sense since I said 0 bytes.
async def run(self):
while True:
print('A')
await asyncio.get_event_loop().sock_recv(self._q._connection, 0)
print('B')
msg = self._q.receive()
print(msg)
The only solution I can think of for now is to add a small timeout to the select and then call asyncio.sleep
while there is no data but this seems like an inefficent approach. I wish there was something like asyncio.select
. Do anyone want to recommend another approach?
EDIT: Right now I have come up with this. I don't like it because it adds an extra quarter second latency (probably doesn't matter much for my application but it still bugs me.)
async def run(self):
while True:
if select.select([self._q._connection], [], [], 0)[0]:
print(self._q.receive())
else:
await asyncio.sleep(0.25)
You could use loop.add_reader to wait for the read availability of your socket:
async def watch(fd):
future = asyncio.Future()
loop.add_reader(fd, future.set_result, None)
future.add_done_callback(lambda f: loop.remove_reader(fd))
await future
async def run(self):
while True:
await watch(self._q._connection)
msg = self._q.receive()
print(msg)
However, it'll be very tricky to avoid all the blocking IO calls of the library you mentioned without rewriting it completely. Instead, I'd recommend to use the loop.run_in_executor method to schedule the blocking IO calls in a thread pool:
async def run(self):
loop = asyncio.get_event_loop()
while True:
msg = await loop.run_in_executor(None, self._q.receive)
print(msg)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With