Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why can `popen.stdout.readline` deadlock and what to do about it?

From the Python documentation

Warning Use communicate() rather than .stdin.write, .stdout.read or .stderr.read to avoid deadlocks due to any of the other OS pipe buffers filling up and blocking the child process.

I'm trying to understand why this would deadlock. For some background, I am spawning N processes in parallel:

for c in commands:
    h = subprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
    handles.append(h)

Then printing the output of each process 1-by-1:

for handle in handles:
    while handle.poll() is None:
        try:
            line = handle.stdout.readline()
        except UnicodeDecodeError:
            line = "((INVALID UNICODE))\n"

        sys.stdout.write(line)
    if handle.returncode != 0:
        print(handle.stdout.read(), file=sys.stdout)
    if handle.returncode != 0:
        print(handle.stderr.read(), file=sys.stderr)

Occasionally this does in fact deadlock. Unfortunately, the documentation's recommendation to use communicate() is not going to work for me, because this process could take several minutes to run, and I don't want it to appear dead during this time. It should print output in real time.

I have several options, such as changing the bufsize argument, polling in a different thread for each handle, etc. But in order to decide what the best way to fix this, I think I need to understand what the fundamental reason for the deadlock is in the first place. Something to do with buffer sizes, apparently, but what? I can hypothesize that maybe all of these processes are sharing a single OS kernel object, and because I'm only draining the buffer of one of the processes, the other ones fill it up, in which case option 2 above would probably fix it. But maybe that's not even the real problem.

Can anyone shed some light on this?

like image 483
Zachary Turner Avatar asked Oct 30 '25 11:10

Zachary Turner


1 Answers

The bidirectional communication between the parent and child processes uses two unidirectional pipes. One for each direction. OK, stderr is the third one, but the idea is the same.

A pipe has two ends, one for writing, one for reading. The capacity of a pipe was 4K and is now 64K on modern Linux. One can expect similar values on other systems. This means, the writer can write to a pipe without problems up to its limit, but then the pipe gets full and a write to it blocks until the reader reads some data from the other end.

From the reader's view is the situation obvious. A regular read blocks until data is available.

To summarize: a deadlock occurs when a process attempts to read from a pipe where nobody is writing to or when it writes data larger that the pipe's capacity to a pipe nobody is reading from.

Typically the two processes act as a client & server and utilize some kind of request/response style communication. Something like half-duplex. One side is writing and the other one is reading. Then they switch the roles. This is practically the most complex setup we can handle with standard synchronous programming. And a deadlock can stil occur when the client and server get somehow out of sync. This can be caused by an empty response, unexpected error message, etc.

If there are several child processes or when the communication protocol is not so simple or we just want a robust solution, we need the parent to operate on all the pipes. communicate() uses threads for this purpose. The other approach is asynchronous I/O: first check, which one is ready to do I/O and only then read or write from that pipe (or socket). The old and deprecated asyncore library implemented that.

On the low level, the select (or similar) system call checks which file handles from a given set are ready for I/O. But at that low level, we can do only one read or write before re-checking. That is the problem of this snippet:

while handle.poll() is None:
    try:
        line = handle.stdout.readline()
    except UnicodeDecodeError:
        line = "((INVALID UNICODE))\n"

The poll check tells us there is something to be read, but this does not mean we will be able to read repeatedly until a newline! We can only do one read and append the data to an input buffer. If there is a newline, we can extract the whole line and process it. If not, we need to wait to next succesfull poll and read.

Writes behave similarly. We can write once, check the number of bytes written and remove that many bytes from the output buffer.

That implies that line buffering and all that higher level stuff needs to be implemented on top of that. Fortunately, the successor of asyncore offers what we need: asyncio > subprocesses.

I hope I could explain the deadlock. The solution could be expected. If you need to do several things, use either threading or asyncio.


UPDATE:

Below is a short asyncio test program. It reads inputs from several child processes and prints the data line by line.

But first a cmd.py helper which prints a line in several small chunks to demonstrate the line buffering. Try the usage e.g. with python3 cmd.py 10.

import sys
import time

def countdown(n):
    print('START', n)
    while n >= 0: 
        print(n, end=' ', flush=True)
        time.sleep(0.1)
        n -= 1
    print('END')

if __name__ == '__main__':
    args = sys.argv[1:]
    if len(args) != 1:
        sys.exit(3)
    countdown(int(args[0]))

And the main program:

import asyncio

PROG = 'cmd.py'
NPROC = 12

async def run1(*execv):
    """Run a program, read input lines."""
    proc = await asyncio.create_subprocess_exec(
        *execv,
        stdin=asyncio.subprocess.DEVNULL,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.DEVNULL)
    # proc.stdout is a StreamReader object
    async for line in proc.stdout:
        print("Got line:", line.decode().strip())

async def manager(prog, nproc):
    """Spawn 'nproc' copies of python script 'prog'."""
    tasks = [asyncio.create_task(run1('python3', prog, str(i))) for i in range(nproc)]
    await asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(manager(PROG, NPROC))

The async for line ... is a feature of StreamReader similar to the for line in file: idiom. It can be replaced it with:

    while True:
        line = await proc.stdout.readline()
        if not line:
            break
        print("Got line:", line.decode().strip())
like image 149
VPfB Avatar answered Nov 02 '25 02:11

VPfB