Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does asyncio from python support coroutine-based API for UDP networking?

I was browsing the python asyncio module documentation this night looking for some ideas for one of my course projects, but I soon find that there might be a lack of feature in python's standard aysncio module.

If you look through the documentation, you'll find that there's a callback based API and a coroutine based API. And the callback API could be used for building both UDP and TCP applications, while it looks that the coroutine API could only be used for building TCP application, as it utilizes the use of a stream-style API.

This quite causes a problem for me because I was looking for a coroutine-based API for UDP networking, although I did find that asyncio supports low-level coroutine based socket methods like sock_recv and sock_sendall, but the crucial APIs for UDP networking, recvfrom and sendto are not there.

What I wish to do is to write some codes like:

async def handle_income_packet(sock):
    await data, addr = sock.recvfrom(4096)
    # data handling here...
    await sock.sendto(addr, response)

I know that this could be equivalently implemented using a callback API, but the problem here is that callbacks are not coroutines but regular functions, so that in it you cannot yield control back to the event loop and preserve the function execution state.

Just look at the above code, if we need to do some blocking-IO operations in the data handling part, we won't have a problem in the coroutine version as long as our IO operations are done in coroutines as well:

async def handle_income_packet(sock):
    await data, addr = sock.recvfrom(4096)
    async with aiohttp.ClientSession() as session:
        info = await session.get(...)
    response = generate_response_from_info(info)
    await sock.sendto(addr, response)

As long as we use await the event loop would take the control flow from that point to handle other things until that IO is done. But sadly these codes are not usable at this moment because we do not have a coroutined version of socket.sendto and socket.recvfrom in asyncio.

What we could implement this in is to use the transport-protocol callback API:

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        self.transport = transport

    def data_received(self, data):
        info = requests.get(...)
        response = generate_response_from_info(info)
        self.transport.write(response)
        self.transport.close()

we cannot await a coroutine there because callbacks are not coroutines, and using a blocking IO call like above would stall the control flow in the callback and prevent the loop to handle any other events until the IO is done

Another recommended implementation idea is to create a Future object in the data_received function, add it to the event loop, and store any needed state variable in the Protocol class, then explicitly return control to the loop. While this could work, it does create a lot of complex codes where in the coroutine version they're not needed in any way.

Also here we have an example of using non-blocking socket and add_reader for handle UDP sockets. But the code still looks complex comparing to coroutine-version's a few lines.

The point I want to make is that coroutine is a really good design that could utilize the power of concurrency in one single thread while also has a really straightforward design pattern that could save both brainpower and unnecessary lines of codes, but the crucial part to get it work for UDP networking is really lacking in our asyncio standard library.

What do you guys think about this?

Also, if there's any other suggestions for 3rd party libraries supporting this kind of API for UDP networking, I would be extremely grateful for the sake of my course project. I found Bluelet is quite like such a thing but it does not seem to be actively maintained.

edit:

It seems that this PR did implement this feature but was rejected by the asyncio developers. The developers claim that all functions could be implemented using create_datagram_endpoint(), the protocol-transfer API. But just as I have discussed above, coroutine API has the power of simpleness compared to using the callback API in many use cases, it is really unfortunate that we do not have these with UDP.

like image 632
Chaser hkj Avatar asked Feb 05 '18 11:02

Chaser hkj


2 Answers

The reason a stream-based API is not provided is because streams offer ordering on top of the callbacks, and UDP communication is inherently unordered, so the two are fundamentally incompatible.

But none of that means you can't invoke coroutines from your callbacks - it's in fact quite easy! Starting from the EchoServerProtocol example, you can do this:

def datagram_received(self, data, addr):
    loop = asyncio.get_event_loop()
    loop.create_task(self.handle_income_packet(data, addr))

async def handle_income_packet(self, data, addr):
    # echo back the message, but 2 seconds later
    await asyncio.sleep(2)
    self.transport.sendto(data, addr)

Here datagram_received starts your handle_income_packet coroutine which is free to await any number of coroutines. Since the coroutine runs in the "background", the event loop is not blocked at any point and datagram_received returns immediately, just as intended.

like image 118
user4815162342 Avatar answered Nov 17 '22 21:11

user4815162342


You might be interested in this module providing high-level UDP endpoints for asyncio:

async def main():
    # Create a local UDP enpoint
    local = await open_local_endpoint('localhost', 8888)

    # Create a remote UDP enpoint, pointing to the first one
    remote = await open_remote_endpoint(*local.address)

    # The remote endpoint sends a datagram
    remote.send(b'Hey Hey, My My')

    # The local endpoint receives the datagram, along with the address
    data, address = await local.receive()

    # Print: Got 'Hey Hey, My My' from 127.0.0.1 port 50603
    print(f"Got {data!r} from {address[0]} port {address[1]}")
like image 2
Vincent Avatar answered Nov 17 '22 20:11

Vincent