I need to write two programs which will be run as a parent process and its child. The parent process spawns the child and then they communicate via pair of pipes connected to child's stdin and stdout. The communication is peer-to-peer, that's why I need asyncio. A simple read/reply loop won't do.
I have written the parent. No problem because asyncio
provides everything I needed in create_subprocess_exec()
.
However I don't know how to create a similar stream reader/writer in the child. I did not expect any problems. because the pipes are already created and file descriptors 0 and 1 are ready to use when the child process starts. No connection is to be open, no process needs to be spawned.
This is my not working attempt:
import asyncio
import sys
_DEFAULT_LIMIT = 64 * 1024
async def connect_stdin_stdout(limit=_DEFAULT_LIMIT, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader(limit=limit, loop=loop)
protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
r_transport, _ = await loop.connect_read_pipe(lambda: protocol, sys.stdin)
w_transport, _ = await loop.connect_write_pipe(lambda: protocol, sys.stdout)
writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)
return reader, writer
The problem is I have two transports where I should have one. The function fails, because it tries to set the protocol's transport twice:
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
await loop.connect_write_pipe(lambda: protocol, sys.stdout)
# !!!! assert self._transport is None, 'Transport already set'
I tried to pass a dummy protocol to the first line, but this line is not correct either, because both transports are needed, not just one:
writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)
I guess I need to combine two unidirectional transports to one bidirectional somehow. Or is my approach entirely wrong? Could you please give me some advice?
UPDATE: after some test this seems to work (but does not look good to me):
async def connect_stdin_stdout(limit=_DEFAULT_LIMIT, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader(limit=limit, loop=loop)
protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
dummy = asyncio.Protocol()
await loop.connect_read_pipe(lambda: protocol, sys.stdin) # sets read_transport
w_transport, _ = await loop.connect_write_pipe(lambda: dummy, sys.stdout)
writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)
return reader, writer
Your first version fails because you are using the wrong protocol for the writer side; the StreamReaderProtocol
implements hooks to react to incoming connections and data, something the writing side doesn't and shouldn't have to deal with.
The loop.connect_write_pipe()
coroutine uses the protocol factory you pass in and returns the resulting protocol instance. You do want to use that same protocol object in the stream writer, instead of the protocol used for the reader.
Next, you do not want to pass the stdin
reader to the stdout
stream writer! That class assumes that the reader and writer are connected to the same file descriptor, and that's really not the case here.
In the recent past I've build the following to handle stdio for a child process; the stdio()
function is based on the Nathan Hoad gist on the subject, plus a fallback for Windows where support for treating stdio as pipes is limited.
You do want the writer to handle backpressure properly, so my version uses the (undocumented) asyncio.streams.FlowControlMixin
class as the protocol for this; you really don't need anything more than that:
import asyncio
import os
import sys
async def stdio(limit=asyncio.streams._DEFAULT_LIMIT, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
if sys.platform == 'win32':
return _win32_stdio(loop)
reader = asyncio.StreamReader(limit=limit, loop=loop)
await loop.connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(reader, loop=loop), sys.stdin)
writer_transport, writer_protocol = await loop.connect_write_pipe(
lambda: asyncio.streams.FlowControlMixin(loop=loop),
os.fdopen(sys.stdout.fileno(), 'wb'))
writer = asyncio.streams.StreamWriter(
writer_transport, writer_protocol, None, loop)
return reader, writer
def _win32_stdio(loop):
# no support for asyncio stdio yet on Windows, see https://bugs.python.org/issue26832
# use an executor to read from stdio and write to stdout
# note: if nothing ever drains the writer explicitly, no flushing ever takes place!
class Win32StdinReader:
def __init__(self):
self.stdin = sys.stdin.buffer
async def readline():
# a single call to sys.stdin.readline() is thread-safe
return await loop.run_in_executor(None, self.stdin.readline)
class Win32StdoutWriter:
def __init__(self):
self.buffer = []
self.stdout = sys.stdout.buffer
def write(self, data):
self.buffer.append(data)
async def drain(self):
data, self.buffer = self.buffer, []
# a single call to sys.stdout.writelines() is thread-safe
return await loop.run_in_executor(None, sys.stdout.writelines, data)
return Win32StdinReader(), Win32StdoutWriter()
While perhaps outdated a little, I found this 2016 blog post by by Nathaniel J. Smith on asyncio and curio to be hugely helpful in understanding how asyncio, protocols, transports and backpressure and such all interact and hang together. That article also shows why creating the reader and writer objects for stdio
is so verbose and cumbersome at the moment.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With