Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

blocks - send input to python subprocess pipeline

I found out how to do it.

It is not about threads, and not about select().

When I run the first process (grep), it creates two low-level file descriptors, one for each pipe. Lets call those a and b.

When I run the second process, b gets passed to cut sdtin. But there is a brain-dead default on Popen - close_fds=False.

The effect of that is that cut also inherits a. So grep can't die even if I close a, because stdin is still open on cut's process (cut ignores it).

The following code now runs perfectly.

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True SHOULD BE THE DEFAULT on unix systems. On windows it closes all fds, so it prevents piping.

EDIT:

PS: For people with a similar problem reading this answer: As pooryorick said in a comment, that also could block if data written to p1.stdin is bigger than the buffers. In that case you should chunk the data into smaller pieces, and use select.select() to know when to read/write. The code in the question should give a hint on how to implement that.

EDIT2: Found another solution, with more help from pooryorick - instead of using close_fds=True and close ALL fds, one could close the fds that belongs to the first process, when executing the second, and it will work. The closing must be done in the child so the preexec_fn function from Popen comes very handy to do just that. On executing p2 you can do:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)

Working with large files

Two principles need to be applied uniformly when working with large files in Python.

  1. Since any IO routine can block, we must keep each stage of the pipeline in a different thread or process. We use threads in this example, but subprocesses would let you avoid the GIL.
  2. We must use incremental reads and writes so that we don't wait for EOF before starting to make progress.

An alternative is to use nonblocking IO, though this is cumbersome in standard Python. See gevent for a lightweight threading library that implements the synchronous IO API using nonblocking primitives.

Example code

We'll construct a silly pipeline that is roughly

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

where each stage in braces {} is implemented in Python while the others use standard external programs. TL;DR: See this gist.

We start with the expected imports.

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

Python stages of the pipeline

All but the last Python-implemented stage of the pipeline needs to go in a thread so that it's IO does not block the others. These could instead run in Python subprocesses if you wanted them to actually run in parallel (avoid the GIL).

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

Each of these needs to be put in its own thread, which we'll do using this convenience function.

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

Create the pipeline

Create the external stages using Popen and the Python stages using spawn. The argument bufsize=-1 says to use the system default buffering (usually 4 kiB). This is generally faster than the default (unbuffered) or line buffering, but you'll want line buffering if you want to visually monitor the output without lags.

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

Drive the pipeline

Assembled as above, all the buffers in the pipeline will fill up, but since nobody is reading from the end (grepz.stdout), they will all block. We could read the entire thing in one call to grepz.stdout.read(), but that would use a lot of memory for large files. Instead, we read incrementally.

for line in grepz.stdout:
    sys.stdout.write(line.lower())

The threads and processes clean up once they reach EOF. We can explicitly clean up using

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6 and earlier

Internally, subprocess.Popen calls fork, configures the pipe file descriptors, and calls exec. The child process from fork has copies of all file descriptors in the parent process, and both copies will need to be closed before the corresponding reader will get EOF. This can be fixed by manually closing the pipes (either by close_fds=True or a suitable preexec_fn argument to subprocess.Popen) or by setting the FD_CLOEXEC flag to have exec automatically close the file descriptor. This flag is set automatically in Python-2.7 and later, see issue12786. We can get the Python-2.7 behavior in earlier versions of Python by calling

p._set_cloexec_flags(p.stdin)

before passing p.stdin as an argument to a subsequent subprocess.Popen.


There are three main tricks to making pipes work as expected

  1. Make sure each end of the pipe is used in a different thread/process (some of the examples near the top suffer from this problem).

  2. explicitly close the unused end of the pipe in each process

  3. deal with buffering by either disabling it (Python -u option), using pty's, or simply filling up the buffer with something that won't affect the data, ( maybe '\n', but whatever fits).

The examples in the Python "pipeline" module (I'm the author) fit your scenario exactly, and make the low-level steps fairly clear.

http://pypi.python.org/pypi/pipeline/

More recently, I used the subprocess module as part of a producer-processor-consumer-controller pattern:

http://www.darkarchive.org/w/Pub/PythonInteract

This example deals with buffered stdin without resorting to using a pty, and also illustrates which pipe ends should be closed where. I prefer processes to threading, but the principle is the same. Additionally, it illustrates synchronizing Queues to which feed the producer and collect output from the consumer, and how to shut them down cleanly (look out for the sentinels inserted into the queues). This pattern allows new input to be generated based on recent output, allowing for recursive discovery and processing.


Nosklo's offered solution will quickly break if too much data is written to the receiving end of the pipe:


from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

If this script doesn't hang on your machine, just increase "20000" to something that exceeds the size of your operating system's pipe buffers.

This is because the operating system is buffering the input to "grep", but once that buffer is full, the p1.stdin.write call will block until something reads from p2.stdout. In toy scenarios, you can get way with writing to/reading from a pipe in the same process, but in normal usage, it is necessary to write from one thread/process and read from a separate thread/process. This is true for subprocess.popen, os.pipe, os.popen*, etc.

Another twist is that sometimes you want to keep feeding the pipe with items generated from earlier output of the same pipe. The solution is to make both the pipe feeder and the pipe reader asynchronous to the man program, and implement two queues: one between the main program and the pipe feeder and one between the main program and the pipe reader. PythonInteract is an example of that.

Subprocess is a nice convenience model, but because it hides the details of the os.popen and os.fork calls it does under the hood, it can sometimes be more difficult to deal with than the lower-level calls it utilizes. For this reason, subprocess is not a good way to learn about how inter-process pipes really work.


You must do this in several threads. Otherwise, you'll end up in a situation where you can't send data: child p1 won't read your input since p2 doesn't read p1's output because you don't read p2's output.

So you need a background thread that reads what p2 writes out. That will allow p2 to continue after writing some data to the pipe, so it can read the next line of input from p1 which again allows p1 to process the data which you send to it.

Alternatively, you can send the data to p1 with a background thread and read the output from p2 in the main thread. But either side must be a thread.


Responding to nosklo's assertion (see other comments to this question) that it can't be done without close_fds=True:

close_fds=True is only necessary if you've left other file descriptors open. When opening multiple child processes, it's always good to keep track of open files that might get inherited, and to explicitly close any that aren't needed:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds defaults to False because subprocess prefers to trust the calling program to know what it's doing with open file descriptors, and just provide the caller with an easy option to close them all if that's what it wants to do.

But the real issue is that pipe buffers will bite you for all but toy examples. As I have said in my other answers to this question, the rule of thumb is to not have your reader and your writer open in the same process/thread. Anyone who wants to use the subprocess module for two-way communication would be well-served to study os.pipe and os.fork, first. They're actually not that hard to use if you have a good example to look at.