Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create custom transport for asyncio?

I'm reading the documentation for Protocol and Transport classes in asyncio package. Specifically:

When subclassing a protocol class, it is recommended you override certain methods. Those methods are callbacks: they will be called by the transport on certain events (for example when some data is received); you shouldn’t call them yourself, unless you are implementing a transport.

Emphasis added

So, in principle, it should be possible to implement a transport, but...

Transports are classes provided by asyncio in order to abstract various kinds of communication channels. You generally won’t instantiate a transport yourself; instead, you will call an AbstractEventLoop method (which one?) which will create the transport and try to initiate the underlying communication channel, calling you back when it succeeds.

Again, emphasis added

Reading the section on AbstractEventLoop along and across I don't see any way of creating a custom transport. The closest it comes is in AbstractEventLoop.create_connection(protocol_factory, host=...) which only implies that it will create some kind of socket...

Well, my ultimate goal is to use a custom transport which isn't any kind of socket (perhaps a StringIO, perhaps a database connection cursor, perhaps another protocol).


So, is this just a well-intended but never implemented mistake in documentation, or is there actually a way to implement custom transport without monkey-patching asyncio and sacrificing firstborns?

like image 370
wvxvw Avatar asked Oct 16 '17 14:10

wvxvw


1 Answers

A bit of boiler-plate code is needed, although it's not too much:

from asyncio import streams, transports, get_event_loop


class CustomTransport(transports.Transport):

    def __init__(self, loop, protocol, *args, **kwargs):
        self._loop = loop
        self._protocol = protocol

    def get_protocol(self):
        return self._protocol

    def set_protocol(self, protocol):
        return self._protocol

    # Implement read/write methods

    # [...]


async def custom_create_connection(protocol_factory, *args, **kwargs):
    loop = get_event_loop()
    protocol = protocol_factory()
    transport = CustomTransport(loop, protocol, *args, **kwargs)
    return transport, protocol


async def custom_open_connection(*args, **kwargs):
    reader = streams.StreamReader()
    protocol = streams.StreamReaderProtocol(reader)
    factory = lambda: protocol
    transport, _ = await custom_create_connection(factory, *args, **kwargs)
    writer = streams.StreamWriter(transport, protocol, reader)
    return reader, writer
like image 197
Vincent Avatar answered Oct 19 '22 03:10

Vincent