I have a need to async read StdIn in order to get messages (json terminated by \r\n) and after processing async write updated message to StdOut.
At the moment I am doing it synchronous like:
class SyncIOStdInOut():
def write(self, payload: str):
sys.stdout.write(payload)
sys.stdout.write('\r\n')
sys.stdout.flush()
def read(self) -> str:
payload=sys.stdin.readline()
return payload
How to do the same but asynchronously?
Here's an example of echo stdin
to stdout
using asyncio streams (for Unix).
import asyncio
import sys
async def connect_stdin_stdout():
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
return reader, writer
async def main():
reader, writer = await connect_stdin_stdout()
while True:
res = await reader.read(100)
if not res:
break
writer.write(res)
if __name__ == "__main__":
asyncio.run(main())
As a ready-to-use solution, you could use aioconsole library. It implements a similar approach, but also provide additional useful asynchronous equivalents to input
, print
, exec
and code.interact
:
from aioconsole import get_standard_streams
async def main():
reader, writer = await get_standard_streams()
Let's try to figure out how the function connect_stdin_stdout
works.
loop = asyncio.get_event_loop()
StreamReader
instance.reader = asyncio.StreamReader()
Generally, StreamReader/StreamWriter
classes are not intended to be directly instantiated and should only be used as a result of functions such as open_connection()
and start_server()
.
StreamReader
provides a buffered asynchronous interface to some data stream. Some source(library code) calls its functions such as feed_data
, feed_eof
, the data is buffered and can be read using the documented interface coroutine read()
, readline()
, and etc.
StreamReaderProtocol
instance.protocol = asyncio.StreamReaderProtocol(reader)
This class is derived from asyncio.Protocol
and FlowControlMixin
and helps to adapt between Protocol
and StreamReader
. It overrides such Protocol
methods as data_received
, eof_received
and calls StreamReader
methods feed_data
.
stdin
in the event loop.await loop.connect_read_pipe(lambda: protocol, sys.stdin)
The connect_read_pipe
function takes as a pipe
parameter a file-like object. stdin
is a file-like object. From now, all data read from the stdin
will fall into the StreamReaderProtocol
and then pass into StreamReader
stdout
in the event loop.w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)
In connect_write_pipe
you need to pass a protocol factory that creates protocol instances that implement flow control logic for StreamWriter.drain()
. This logic is implemented in the class FlowControlMixin
. Also StreamReaderProtocol
inherited from it.
StreamWriter
instance.writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
This class forwards the data passed to it using functions write()
, writelines()
and etc. to the underlying transport
.
protocol
is used to support the drain()
function to wait for the moment that the underlying transport has flushed its internal buffer and is available for writing again.
reader
is an optional parameter and can be None
, it is also used to support the drain()
function, at the start of this function it is checked if an exception was set for the reader, for example, due to a connection lost (relevant for sockets and bidirectional connections), then drain()
will also throw an exception.
You can read more about StreamWriter
and drain()
function in this great answer.
To read lines with \r\n
separator readuntil can be used
This is another way you can async read from stdin (reads a single line at a time).
async def async_read_stdin()->str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, sys.stdin.readline)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With