Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement recvmsg() with asyncio?

Since recvmsg() is missing in asyncio module, I've tried to reimplement it in exactly the same way as BaseEventLoop.sock_recv() is implemented:

import asyncio, socket


def _sock_recvmsg(loop, fut, registered, sock, bufsize, ancbufsize):
    self = loop
    fd = sock.fileno()
    if registered: self.remove_reader(fd)
    if fut.cancelled(): return
    try: data = sock.recvmsg(bufsize, ancbufsize)
    except (BlockingIOError, InterruptedError): self.add_reader(fd, self._sock_recvmsg, fut, True, sock, bufsize, ancbufsize)
    except Exception as exc: fut.set_exception(exc)
    else: fut.set_result(data)


def sock_recvmsg(loop, sock, bufsize, ancbufsize=0):
    self = loop
    if self._debug and sock.gettimeout() != 0: raise ValueError('the socket must be non-blocking')
    fut = asyncio.futures.Future(loop=self)
    self._sock_recvmsg(fut, False, sock, bufsize, ancbufsize)
    return fut


asyncio.unix_events._UnixSelectorEventLoop._sock_recvmsg = _sock_recvmsg
asyncio.unix_events._UnixSelectorEventLoop.sock_recvmsg = sock_recvmsg

But this trivial test fails, only the first value is received and test hangs after that:

async def produce():
    for i in range(5):
        end_out.sendmsg([bytes([i])])
        await asyncio.sleep(1)

    end_out.close()

async def consume():
    while True:
        v, a, b, c = await asyncio.get_event_loop().sock_recvmsg(end_in, 1)
        if v == b'': return
        print(v)

if __name__ == '__main__':
    end_in, end_out = socket.socketpair()
    asyncio.ensure_future(produce())
    asyncio.get_event_loop().run_until_complete(consume())

What am I missing?

like image 506
Grief Avatar asked Sep 07 '25 06:09

Grief


1 Answers

recvmsg()-related code is correct, but I've missed

end_in.setblocking(0)
end_out.setblocking(0)

That is so stupid so I'm considering deletion of that question.

like image 162
Grief Avatar answered Sep 11 '25 00:09

Grief