I'm reading the documentation for Protocol
and Transport
classes in asyncio
package. Specifically:
When subclassing a protocol class, it is recommended you override certain methods. Those methods are callbacks: they will be called by the transport on certain events (for example when some data is received); you shouldn’t call them yourself, unless you are implementing a transport.
Emphasis added
So, in principle, it should be possible to implement a transport, but...
Transports are classes provided by asyncio in order to abstract various kinds of communication channels. You generally won’t instantiate a transport yourself; instead, you will call an AbstractEventLoop method (which one?) which will create the transport and try to initiate the underlying communication channel, calling you back when it succeeds.
Again, emphasis added
Reading the section on AbstractEventLoop along and across I don't see any way of creating a custom transport. The closest it comes is in AbstractEventLoop.create_connection(protocol_factory, host=...)
which only implies that it will create some kind of socket...
Well, my ultimate goal is to use a custom transport which isn't any kind of socket (perhaps a StringIO
, perhaps a database connection cursor, perhaps another protocol).
So, is this just a well-intended but never implemented mistake in documentation, or is there actually a way to implement custom transport without monkey-patching asyncio
and sacrificing firstborns?
A bit of boiler-plate code is needed, although it's not too much:
from asyncio import streams, transports, get_event_loop
class CustomTransport(transports.Transport):
def __init__(self, loop, protocol, *args, **kwargs):
self._loop = loop
self._protocol = protocol
def get_protocol(self):
return self._protocol
def set_protocol(self, protocol):
return self._protocol
# Implement read/write methods
# [...]
async def custom_create_connection(protocol_factory, *args, **kwargs):
loop = get_event_loop()
protocol = protocol_factory()
transport = CustomTransport(loop, protocol, *args, **kwargs)
return transport, protocol
async def custom_open_connection(*args, **kwargs):
reader = streams.StreamReader()
protocol = streams.StreamReaderProtocol(reader)
factory = lambda: protocol
transport, _ = await custom_create_connection(factory, *args, **kwargs)
writer = streams.StreamWriter(transport, protocol, reader)
return reader, writer
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With