Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create asyncio stream reader/writer for stdin/stdout?

I need to write two programs which will be run as a parent process and its child. The parent process spawns the child and then they communicate via pair of pipes connected to child's stdin and stdout. The communication is peer-to-peer, that's why I need asyncio. A simple read/reply loop won't do.

I have written the parent. No problem because asyncio provides everything I needed in create_subprocess_exec().

However I don't know how to create a similar stream reader/writer in the child. I did not expect any problems. because the pipes are already created and file descriptors 0 and 1 are ready to use when the child process starts. No connection is to be open, no process needs to be spawned.

This is my not working attempt:

import asyncio
import sys

_DEFAULT_LIMIT = 64 * 1024

async def connect_stdin_stdout(limit=_DEFAULT_LIMIT, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader(limit=limit, loop=loop)
    protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
    r_transport, _ = await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    w_transport, _ = await loop.connect_write_pipe(lambda: protocol, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)
    return reader, writer

The problem is I have two transports where I should have one. The function fails, because it tries to set the protocol's transport twice:

await loop.connect_read_pipe(lambda: protocol, sys.stdin)
await loop.connect_write_pipe(lambda: protocol, sys.stdout)
# !!!! assert self._transport is None, 'Transport already set'

I tried to pass a dummy protocol to the first line, but this line is not correct either, because both transports are needed, not just one:

writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)

I guess I need to combine two unidirectional transports to one bidirectional somehow. Or is my approach entirely wrong? Could you please give me some advice?


UPDATE: after some test this seems to work (but does not look good to me):

async def connect_stdin_stdout(limit=_DEFAULT_LIMIT, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader(limit=limit, loop=loop)
    protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
    dummy = asyncio.Protocol()
    await loop.connect_read_pipe(lambda: protocol, sys.stdin) # sets read_transport
    w_transport, _ = await loop.connect_write_pipe(lambda: dummy, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)
return reader, writer
like image 495
VPfB Avatar asked Aug 30 '18 05:08

VPfB


1 Answers

Your first version fails because you are using the wrong protocol for the writer side; the StreamReaderProtocol implements hooks to react to incoming connections and data, something the writing side doesn't and shouldn't have to deal with.

The loop.connect_write_pipe() coroutine uses the protocol factory you pass in and returns the resulting protocol instance. You do want to use that same protocol object in the stream writer, instead of the protocol used for the reader.

Next, you do not want to pass the stdin reader to the stdout stream writer! That class assumes that the reader and writer are connected to the same file descriptor, and that's really not the case here.

In the recent past I've build the following to handle stdio for a child process; the stdio() function is based on the Nathan Hoad gist on the subject, plus a fallback for Windows where support for treating stdio as pipes is limited.

You do want the writer to handle backpressure properly, so my version uses the (undocumented) asyncio.streams.FlowControlMixin class as the protocol for this; you really don't need anything more than that:

import asyncio
import os
import sys

async def stdio(limit=asyncio.streams._DEFAULT_LIMIT, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    if sys.platform == 'win32':
        return _win32_stdio(loop)

    reader = asyncio.StreamReader(limit=limit, loop=loop)
    await loop.connect_read_pipe(
        lambda: asyncio.StreamReaderProtocol(reader, loop=loop), sys.stdin)

    writer_transport, writer_protocol = await loop.connect_write_pipe(
        lambda: asyncio.streams.FlowControlMixin(loop=loop),
        os.fdopen(sys.stdout.fileno(), 'wb'))
    writer = asyncio.streams.StreamWriter(
        writer_transport, writer_protocol, None, loop)

    return reader, writer

def _win32_stdio(loop):
    # no support for asyncio stdio yet on Windows, see https://bugs.python.org/issue26832
    # use an executor to read from stdio and write to stdout
    # note: if nothing ever drains the writer explicitly, no flushing ever takes place!
    class Win32StdinReader:
        def __init__(self):
            self.stdin = sys.stdin.buffer 
        async def readline():
            # a single call to sys.stdin.readline() is thread-safe
            return await loop.run_in_executor(None, self.stdin.readline)

    class Win32StdoutWriter:
        def __init__(self):
            self.buffer = []
            self.stdout = sys.stdout.buffer
        def write(self, data):
            self.buffer.append(data)
        async def drain(self):
            data, self.buffer = self.buffer, []
            # a single call to sys.stdout.writelines() is thread-safe
            return await loop.run_in_executor(None, sys.stdout.writelines, data)

    return Win32StdinReader(), Win32StdoutWriter()

While perhaps outdated a little, I found this 2016 blog post by by Nathaniel J. Smith on asyncio and curio to be hugely helpful in understanding how asyncio, protocols, transports and backpressure and such all interact and hang together. That article also shows why creating the reader and writer objects for stdio is so verbose and cumbersome at the moment.

like image 156
Martijn Pieters Avatar answered Nov 07 '22 23:11

Martijn Pieters