I'm testing out a way to print out stdout from several subprocesses in Python 2.7. What I have setup is a main process that spawns, at the moment, three subprocesses and spits out their output. Each subprocess is a for-loop that goes to sleep for some random amount of time, and when it wakes up, says "Slept for X seconds".
The problem I'm seeing is that the printing out seems synchronous. Say subprocess A sleeps for 1 second, subprocess B sleeps for 3 seconds, and subprocess C sleeps for 10 seconds. The main process stops for the full 10 seconds when it's trying to see if subprocess C has something, even though the other two have probably slept and printed something out. This is to simulate if a subprocess truly has nothing to output for a longer period of time than the other two.
I need a solution which works on Windows.
My code is as follows:
main_process.py
import sys
import subprocess
logfile = open('logfile.txt', 'w')
processes = [
subprocess.Popen('python subproc_1.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_2.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_3.py', stdout=subprocess.PIPE, bufsize=1),
]
while True:
line = processes[0].stdout.readline()
if line != '':
sys.stdout.write(line)
logfile.write(line)
line = processes[1].stdout.readline()
if line != '':
sys.stdout.write(line)
logfile.write(line)
line = processes[2].stdout.readline()
if line != '':
sys.stdout.write(line)
logfile.write(line)
#If everyone is dead, break
if processes[0].poll() is not None and \
processes[1].poll() is not None and \
processes[2].poll() is not None:
break
processes[0].wait()
processes[1].wait()
print 'Done'
subproc_1.py/subproc_2.py/subproc_3.py
import time, sys, random
sleep_time = random.random() * 3
for x in range(0, 20):
print "[PROC1] Slept for {0} seconds".format(sleep_time)
sys.stdout.flush()
time.sleep(sleep_time)
sleep_time = random.random() * 3 #this is different for each subprocess.
Update: Solution
Taking the answer below along with this question, this is this should work.
import sys
import subprocess
from threading import Thread
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty # for Python 3.x
ON_POSIX = 'posix' in sys.builtin_module_names
def enqueue_output(out, queue):
for line in iter(out.readline, b''):
queue.put(line)
out.close()
if __name__ == '__main__':
logfile = open('logfile.txt', 'w')
processes = [
subprocess.Popen('python subproc_1.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_2.py', stdout=subprocess.PIPE, bufsize=1),
subprocess.Popen('python subproc_3.py', stdout=subprocess.PIPE, bufsize=1),
]
q = Queue()
threads = []
for p in processes:
threads.append(Thread(target=enqueue_output, args=(p.stdout, q)))
for t in threads:
t.daemon = True
t.start()
while True:
try:
line = q.get_nowait()
except Empty:
pass
else:
sys.stdout.write(line)
logfile.write(line)
logfile.flush()
#break when all processes are done.
if all(p.poll() is not None for p in processes):
break
print 'All processes done'
I'm not sure if I need any cleanup code at the end of the while loop. If anyone has comments about it, please add them.
And each subproc script looks similar to this (I edited for the sake of making a better example):
import datetime, time, sys, random
for x in range(0, 20):
sleep_time = random.random() * 3
time.sleep(sleep_time)
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%H%M%S.%f')
print "[{0}][PROC1] Slept for {1} seconds".format(timestamp, sleep_time)
sys.stdout.flush()
print "[{0}][PROC1] Done".format(timestamp)
sys.stdout.flush()
Your problem comes from the fact that readline() is a blocking function; if you call it on a file object and there isn't a line waiting to be read, the call won't return until there is a line of output. So what you have now will read repeatedly from subprocesses 1, 2, and 3 in that order, pausing at each until output is ready.
(Edit: The OP clarified that they're on Windows, which makes the below inapplicable. )
If you want to read from whichever output stream is ready, you need to check on the status of the streams in non-blocking fashion, using the select module, and then attempt reads only on those that are ready. select provides various ways of doing this, but for the sake of example we'll use select.select(). After starting your subprocesses, you'll have something like:
streams = [p.stdout for p in processes]
def output(s):
for f in [sys.stdout, logfile]:
f.write(s)
f.flush()
while True:
rstreams, _, _ = select.select(streams, [], [])
for stream in rstreams:
line = stream.readline()
output(line)
if all(p.poll() is not None for p in processes):
break
for stream in streams:
output(stream.read())
What select() does, when called with three lists of file objects (or file descriptors), is return three subsets of its arguments, which are the streams that are ready for reading, are ready for writing, or have an error condition. Thus on each iteration of the loop we check to see which output streams are ready to read, and iterate over just those. Then we repeat. (Note that it's important here that you're line-buffering the output; the above code assumes that if a stream is ready for reading there's at least one full line ready to be read. If you specify different buffering the above can block.)
A further problem with your original code: When you exit the loop after poll() reports all subprocesses to have exited, you might not have read all their output. So you need to do a last sweep over the streams to read any remaining output.
Note: The example code I gave doesn't try all that hard to capture the subprocesses' output in exactly the order in which it becomes available (which is impossible to do perfectly, but can be approximated more closely than the above manages to do). It also lacks other refinements (for example, in the main loop it'll continue to select on the stdout of every subprocess, even after some have already terminated, which is harmless, but inefficient). It's just meant to illustrate a basic technique of non-blocking IO.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With