Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python asyncio how to read StdIn and write to StdOut?

I have a need to async read StdIn in order to get messages (json terminated by \r\n) and after processing async write updated message to StdOut.

At the moment I am doing it synchronous like:

class SyncIOStdInOut():
    def write(self, payload: str):
        sys.stdout.write(payload)
        sys.stdout.write('\r\n')
        sys.stdout.flush()

    def read(self) -> str:
        payload=sys.stdin.readline()
        return  payload

How to do the same but asynchronously?

like image 403
user3225309 Avatar asked Oct 11 '20 11:10

user3225309


2 Answers

Here's an example of echo stdin to stdout using asyncio streams (for Unix).

import asyncio
import sys


async def connect_stdin_stdout():
    loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
    return reader, writer


async def main():
    reader, writer = await connect_stdin_stdout()
    while True:
        res = await reader.read(100)
        if not res:
            break
        writer.write(res)


if __name__ == "__main__":
    asyncio.run(main())

As a ready-to-use solution, you could use aioconsole library. It implements a similar approach, but also provide additional useful asynchronous equivalents to input, print, exec and code.interact:

from aioconsole import get_standard_streams

async def main():
    reader, writer = await get_standard_streams()

Update:

Let's try to figure out how the function connect_stdin_stdout works.

  1. Get the current event loop:
loop = asyncio.get_event_loop()
  1. Create StreamReader instance.
reader = asyncio.StreamReader()

Generally, StreamReader/StreamWriter classes are not intended to be directly instantiated and should only be used as a result of functions such as open_connection() and start_server(). StreamReader provides a buffered asynchronous interface to some data stream. Some source(library code) calls its functions such as feed_data, feed_eof, the data is buffered and can be read using the documented interface coroutine read(), readline(), and etc.

  1. Create StreamReaderProtocol instance.
protocol = asyncio.StreamReaderProtocol(reader)

This class is derived from asyncio.Protocol and FlowControlMixin and helps to adapt between Protocol and StreamReader. It overrides such Protocol methods as data_received, eof_received and calls StreamReader methods feed_data.

  1. Register standard input stream stdin in the event loop.
await loop.connect_read_pipe(lambda: protocol, sys.stdin)

The connect_read_pipe function takes as a pipe parameter a file-like object. stdin is a file-like object. From now, all data read from the stdin will fall into the StreamReaderProtocol and then pass into StreamReader

  1. Register standard output stream stdout in the event loop.
w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)

In connect_write_pipe you need to pass a protocol factory that creates protocol instances that implement flow control logic for StreamWriter.drain(). This logic is implemented in the class FlowControlMixin. Also StreamReaderProtocol inherited from it.

  1. Create StreamWriter instance.
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)

This class forwards the data passed to it using functions write(), writelines() and etc. to the underlying transport.

protocol is used to support the drain() function to wait for the moment that the underlying transport has flushed its internal buffer and is available for writing again.

reader is an optional parameter and can be None, it is also used to support the drain() function, at the start of this function it is checked if an exception was set for the reader, for example, due to a connection lost (relevant for sockets and bidirectional connections), then drain() will also throw an exception.

You can read more about StreamWriter and drain() function in this great answer.

Update 2:

To read lines with \r\n separator readuntil can be used

like image 136
alex_noname Avatar answered Nov 14 '22 23:11

alex_noname


This is another way you can async read from stdin (reads a single line at a time).

async def async_read_stdin()->str:
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, sys.stdin.readline)
like image 21
David Beesley Avatar answered Nov 14 '22 21:11

David Beesley