Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Simple Python TCP forking server using asyncio

What I'm trying to do

I'm trying to emulate the behavior of the following simple socat(1) command:

socat tcp-listen:SOME_PORT,fork,reuseaddr exec:'SOME_PROGRAM'

The above command creates a forking TCP server which forks and executes SOME_PROGRAM for each connection, redirecting both stdin and stdout of said command to the TCP socket.

Here's what I'd like to achieve:

  1. Create a simple TCP server with asyncio to handle multiple concurrent connections.
  2. Whenever a connection is received, start SOME_PROGRAM as a sub-process.
  3. Pass any data received from the socket to SOME_PROGRAM's standard input.
  4. Pass any data received from SOME_PROGRAM's standard output to the socket.
  5. When SOME_PROGRAM exits, write a goodbye message along with the exit code to the socket and close the connection.

I would like to do this in pure Python, without using external libraries using the asyncio module.

What I have so far

Here's the code I wrote so far:

import asyncio

class ServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.client_addr   = transport.get_extra_info('peername')
        self.transport     = transport
        self.child_process = None

        print('Connection with {} enstablished'.format(self.client_addr))

        asyncio.ensure_future(self._create_subprocess())

    def connection_lost(self, exception):
        print('Connection with {} closed.'.format(self.client_addr))

        if self.child_process.returncode is not None:
            self.child_process.terminate()

    def data_received(self, data):
        print('Data received: {!r}'.format(data))

        # Make sure the process has been spawned
        # Does this even make sense? Looks so awkward to me...
        while self.child_process is None:
            continue

        # Write any received data to child_process' stdin
        self.child_process.stdin.write(data)

    async def _create_subprocess(self):
        self.child_process = await asyncio.create_subprocess_exec(
            *TARGET_PROGRAM,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE
        )

        # Start reading child stdout
        asyncio.ensure_future(self._pipe_child_stdout())

        # Ideally I would register some callback here so that when
        # child_process exits I can write to the socket a goodbye
        # message and close the connection, but I don't know how
        # I could do that...

    async def _pipe_child_stdout(self):
        # This does not seem to work, this function returns b'', that is an
        # empty buffer, AFTER the process exits...
        data = await self.child_process.stdout.read(100) # Arbitrary buffer size

        print('Child process data: {!r}'.format(data))

        if data:
            # Send to socket
            self.transport.write(data)
            # Reschedule to read more data
            asyncio.ensure_future(self._pipe_child_stdout())


SERVER_PORT    = 6666
TARGET_PROGRAM = ['./test']

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    coro = loop.create_server(ServerProtocol, '0.0.0.0', SERVER_PORT)
    server = loop.run_until_complete(coro)

    print('Serving on {}'.format(server.sockets[0].getsockname()))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()

And also the ./test program I'm trying to run as subprocess:

#!/usr/bin/env python3

import sys

if sys.stdin.read(2) == 'a\n':
    sys.stdout.write('Good!\n')
else:
    sys.exit(1)

if sys.stdin.read(2) == 'b\n':
    sys.stdout.write('Wonderful!\n')
else:
    sys.exit(1)

sys.exit(0)

Unfortunately the above code doesn't really work, and I'm kind of lost about what to try next.

What works as intended:

  • The child process is correctly spawned, and also seems to correctly receive the input from the socket, because I can see it from htop and I can also see that as soon as I send b\n it terminates.

What doesn't work as intended:

Basically anything else...

  • The child process' output is never sent to the socket, and actually never read at all. The call await self.child_process.stdout.read(100) never seems to terminate: instead, it only terminates after the child process dies and the result is just b'' (an empty bytes object).
  • I'm not able to understand when the child process terminates: as I mentioned above, I'd like to send a "Goodbye" message to the socket along with self.child_process.returncode when this happens, but I don't know how to do this in a way which makes sense.

What I tried:

  • I tried creating the child process using asyncio.loop.subprocess_exec() instead of asyncio.create_subprocess_exec(). This solves the problem of knowing when the process terminates, since I can instantiate a subclass of asyncio.SubprocessProtocol and use its process_exited() method, but doesn't really help me at all, since if I do it this way I don't have a mean to talk to the process' stdin or stdout anymore! That is, I don't have a Process object to interact with...
  • I tried playing around with asyncio.loop.connect_write_pipe() and loop.connect_read_pipe() with no luck.

Questions

So, could someone please help me figure out what am I doing wrong? There has to be a way to make this work smoothly. When I first started, I was looking for a way to just effortlessly use some pipe redirection, but I don't know if that's even possible at this point. Is it? It looks like it should be.

I could write this program in C in 15 minutes using fork(), exec() and dup2(), so there's something I must be missing! Any help is appreciated.

like image 765
Marco Bonelli Avatar asked May 03 '19 23:05

Marco Bonelli


People also ask

What is serve_ forever python?

serve_forever(poll_interval=0.5) Handle requests until an explicit shutdown() request. Poll for shutdown every poll_interval seconds. Ignores the timeout attribute.

How do I use async await in Python?

An async function uses the await keyword to denote a coroutine. When using the await keyword, coroutines release the flow of control back to the event loop. To run a coroutine, we need to schedule it on the event loop. After scheduling, coroutines are wrapped in Tasks as a Future object.


1 Answers

Your code has two immediate implementation issues:

  • The server strips the whitespace off received data before transmitting it to the subprocess. This removes the trailing newline, so if the TCP client sends "a\n", the subprocess will receive just "a". That way the subprocess never encounters the expected "a\n" string, and it always terminates after reading two bytes. This explains the empty string (EOF) coming from the subprocess. (Stripping has been removed in a subsequent edit to the question.)
  • The subprocess doesn't flush its output, so the server doesn't receive any of its writes. The writes are seen only when the subprocess exits, or when it fills up its output buffer, which measures in kilobytes and takes a while to fill when displaying short debugging messages.

The other issue is on the design level. As mentioned in the comments, unless your explicit intention is to implement a new asyncio protocol, it is recommended to stick to the higher-level stream-based API, in this case the start_server function. The even lower-level functionality like SubprocessProtocol, connect_write_pipe, and connect_read_pipe is also not something you would want to use in application code. The rest of this answer assumes a stream-based implementation.

start_server accepts a coroutine which will be spawned as a new task whenever a client connects. It is called with two asyncio stream arguments, one for reading and one for writing. The coroutine contains the logic of communicating to a client; in your case it would spawn the subprocess and transfer data between it and the client.

Note that the bidirectional data transfer between the socket and the subprocess cannot be achieved with a simple loop with reads followed by writes. For example, consider this loop:

# INCORRECT: can deadlock (and also doesn't detect EOF)
child = await asyncio.create_subprocess_exec(...)
while True:
    proc_data = await child.stdout.read(1024)  # (1)
    sock_writer.write(proc_data)
    sock_data = await sock_reader.read(1024)
    child.stdin.write(sock_data)               # (2)

This kind of loop is prone to deadlocks. If the subprocess is responding to data it receives from the TCP client, it will sometimes only provide output after it receives some input. That will block the loop at (1) indefinitely, because it can get the data from child's stdout only after sending the child the sock_data, which happens later, at (2). Effectively, (1) waits for (2) and vice versa, constituting a deadlock. Note that reversing the order of transfers won't help because then the loop would deadlock if the TCP client is processing the output of the server's subprocess.

With asyncio at our disposal, such deadlock is easy to avoid: simply spawn two coroutines in parallel, one that transfers data from the socket to the subprocess's stdin, and another that transfers data from the subrocess's stdout to the socket. For example:

# correct: deadlock-free (and detects EOF)
async def _transfer(src, dest):
    while True:
        data = await src.read(1024)
        if data == b'':
            break
        dest.write(data)

child = await asyncio.create_subprocess_exec(...)
loop.create_task(_transfer(child.stdout, sock_writer))
loop.create_task(_transfer(sock_reader, child.stdin))
await child.wait()

The difference between this setup and the first while loop is that the two transfers independent of each other. The deadlock cannot occur because the read from the socket never waits for the read from the subprocess and vice versa.

Applied to the question, the whole server would look like this:

import asyncio

class ProcServer:
    async def _transfer(self, src, dest):
        while True:
            data = await src.read(1024)
            if data == b'':
                break
            dest.write(data)

    async def _handle_client(self, r, w):
        loop = asyncio.get_event_loop()
        print(f'Connection from {w.get_extra_info("peername")}')
        child = await asyncio.create_subprocess_exec(
            *TARGET_PROGRAM, stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE)
        sock_to_child = loop.create_task(self._transfer(r, child.stdin))
        child_to_sock = loop.create_task(self._transfer(child.stdout, w))
        await child.wait()
        sock_to_child.cancel()
        child_to_sock.cancel()
        w.write(b'Process exited with status %d\n' % child.returncode)
        w.close()

    async def start_serving(self):
        await asyncio.start_server(self._handle_client,
                                   '0.0.0.0', SERVER_PORT)

SERVER_PORT    = 6666
TARGET_PROGRAM = ['./test']

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    server = ProcServer()
    loop.run_until_complete(server.start_serving())
    loop.run_forever()

The accompanying test program must also be modified to call sys.stdout.flush() after every sys.stdout.write(), otherwise the messages linger in its stdio buffers instead of being sent to the parent.

When I first started I was looking for a way to just effortlessly use some pipe redirection, but I don't know if that's even possible at this point. Is it? It looks like it should be.

On Unix-like systems it is certainly possible to redirect a socket to a spawned subprocess, so that the subprocess is directly talking to the client. (The old inetd Unix server works like this.) But that mode of operation is not supported by asyncio, for two reasons:

  • it does not work on all systems supported by Python and asyncio, in particular on Windows.
  • it is incompatible with core asyncio features, such as transports/protocols and streams, which assume ownership and exclusive access to the underlying sockets.

Even if you don't care about portability, consider the second point: you might need to process or log the data exchanged between the TCP client and the subprocess, and you can't do that if they're welded together on the kernel level. Also, timeouts and cancelation are much easier to implement in asyncio coroutines than when dealing with just an opaque subprocess.

If non-portability and inability to control the communication is fine for your use case, then you probably don't need asyncio in the first place - nothing prevents you from spawning a thread that runs a classic blocking server which handles each client with the same sequence of os.fork, os.dup2, and os.execlp that you'd write in C.

EDIT

As the OP points out in a comment, the original code handles the TCP client disconnecting by killing the child process. At the stream layer, a connection loss is reflected by the stream either signaling end-of-file or raising an exception. In the above code, one could easily react to connection loss by replacing the generic self._transfer() with a more specific coroutine that handles that case. For example, instead of:

sock_to_child = loop.create_task(self._transfer(r, child.stdin))

...one could write:

sock_to_child = loop.create_task(self._sock_to_child(r, child))

and define _sock_to_child like this (untested):

async def _sock_to_child(self, reader, child):
    try:
        await self._transfer(reader, child.stdin)
    except IOError as e:
        # IO errors are an expected part of the workflow,
        # we don't want to propagate them
        print('exception:', e)
    child.kill()

If the child outlives the TCP client, the child.kill() line will likely never execute because the coroutine will be canceled by _handle_client while suspended in src.read() inside _transfer().

like image 150
user4815162342 Avatar answered Oct 19 '22 19:10

user4815162342