The problem: I have a poorly designed Fortran program (I cannot change it, I'm stuck with it) which takes text input from stdin and other input files, and writes text output results to stdout and other output files. The size of input and out is quite large, and I would like to avoid writing to the hard drive (slow operation). I have written a function that iterates over the lines of the several input files, and I also have parsers for multiple output. I don't really know if the program first read all the input and then starts to output, or starts outputting while reading the input.
The goal: To have a function that feeds the external program with what it wants, and parses the output as it comes from the program, without writing data to text files on the hard drive.
Research: The naive way using files is:
from subprocess import PIPE, Popen
def execute_simple(cmd, stdin_iter, stdout_parser, input_files, output_files):
for filename, file_iter in input_files.iteritems():
with open(filename ,'w') as f:
for line in file_iter:
f.write(line + '\n')
p_sub = Popen(
shlex.split(cmd),
stdin = PIPE,
stdout = open('stdout.txt', 'w'),
stderr = open('stderr.txt', 'w'),
bufsize=1
)
for line in stdin_iter:
p_sub.stdin.write(line + '\n')
p_sub.stdin.close()
p_sub.wait()
data = {}
for filename, parse_func in output_files.iteritems():
# The stdout.txt and stderr.txt is included here
with open(filename,'r') as f:
data[filename] = parse_func(
iter(f.readline, b'')
)
return data
I have tried to and the subprocess module to execute the external program together. The additional input/output files are handled with named pipes and multiprocessing. I want to feed stdin with an iterator (which returns the lines for input), save the stderr in a list, and parse the stdout as it comes from the external program. The input and output can be quite large, so using communicate
is not feasible.
I have a parser on the format:
def parser(iterator):
for line in iterator:
# Do something
if condition:
break
some_other_function(iterator)
return data
I looked at this solution using select
to choose the appropriate stream, however I don't know how to make it work with my stdout parser and how to feed the stdin.
I also look the asyncio module, but as I can see I will have the same problem with the parsing of stout.
You should use named pipes for all input and output to the Fortran program to avoid writing to disk. Then, in your consumer, you can use threads to read from each of the program's output sources and add the information to a Queue for in-order processing.
To model this, I created a python app daemon.py
that reads from standard input and returns the square root until EOF. It logs all input to a log file specified as a command-line argument and prints the square root to stdout and all errors to stderr. I think it simulates your program (of course the number of output files is only one, but it can be scaled). You can view the source code for this test application here. Note the explicit call to stdout.flush()
. By default, the standard output is print buffered, which means that this is output at the end and messages will not arrive in order. I hope your Fortran application does not buffer its output. I believe that my sample application will probably not run on Windows, due to a Unix-only use of select
, which shouldn't matter in your case.
I have my consumer application which starts the daemon application as a subprocess, with stdin, stdout and stderr redirected to subprocess.PIPE
s. each of these pipes is given to a different thread, one to give input, and three to handle the log file, errors and standard output respectively. They all add their messages to a shared Queue
which your main thread reads from and sends to your parser.
This is my consumer's code:
import os, random, time
import subprocess
import threading
import Queue
import atexit
def setup():
# make a named pipe for every file the program should write
logfilepipe='logpipe'
os.mkfifo(logfilepipe)
def cleanup():
# put your named pipes here to get cleaned up
logfilepipe='logpipe'
os.remove(logfilepipe)
# run our cleanup code no matter what - avoid leaving pipes laying around
# even if we terminate early with Ctrl-C
atexit.register(cleanup)
# My example iterator that supplies input for the program. You already have an iterator
# so don't worry about this. It just returns a random input from the sample_data list
# until the maximum number of iterations is reached.
class MyIter():
sample_data=[0,1,2,4,9,-100,16,25,100,-8,'seven',10000,144,8,47,91,2.4,'^',56,18,77,94]
def __init__(self, numiterations=1000):
self.numiterations=numiterations
self.current = 0
def __iter__(self):
return self
def next(self):
self.current += 1
if self.current > self.numiterations:
raise StopIteration
else:
return random.choice(self.__class__.sample_data)
# Your parse_func function - I just print it out with a [tag] showing its source.
def parse_func(source,line):
print "[%s] %s" % (source,line)
# Generic function for sending standard input to the problem.
# p - a process handle returned by subprocess
def input_func(p, queue):
# run the command with output redirected
for line in MyIter(30): # Limit for testing purposes
time.sleep(0.1) # sleep a tiny bit
p.stdin.write(str(line)+'\n')
queue.put(('INPUT', line))
p.stdin.close()
p.wait()
# Once our process has ended, tell the main thread to quit
queue.put(('QUIT', True))
# Generic function for reading output from the program. source can either be a
# named pipe identified by a string, or subprocess.PIPE for stdout and stderr.
def read_output(source, queue, tag=None):
print "Starting to read output for %r" % source
if isinstance(source,str):
# Is a file or named pipe, so open it
source=open(source, 'r') # open file with string name
line = source.readline()
# enqueue and read lines until EOF
while line != '':
queue.put((tag, line.rstrip()))
line = source.readline()
if __name__=='__main__':
cmd='daemon.py'
# set up our FIFOs instead of using files - put file names into setup() and cleanup()
setup()
logfilepipe='logpipe'
# Message queue for handling all output, whether it's stdout, stderr, or a file output by our command
lq = Queue.Queue()
# open the subprocess for command
print "Running command."
p = subprocess.Popen(['/path/to/'+cmd,logfilepipe],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Start threads to handle the input and output
threading.Thread(target=input_func, args=(p, lq)).start()
threading.Thread(target=read_output, args=(p.stdout, lq, 'OUTPUT')).start()
threading.Thread(target=read_output, args=(p.stderr, lq, 'ERRORS')).start()
# open a thread to read any other output files (e.g. log file) as named pipes
threading.Thread(target=read_output, args=(logfilepipe, lq, 'LOG')).start()
# Now combine the results from our threads to do what you want
run=True
while(run):
(tag, line) = lq.get()
if tag == 'QUIT':
run=False
else:
parse_func(tag, line)
My iterator returns a random input value (some of which are junk to cause errors). Yours should be a drop-in replacement. The program will run until the end of its input and then wait for the subprocess to complete before enqueueing a QUIT
message to your main thread. My parse_func
is obviously super simple, simply printing out the message and its source, but you should be able to work with something. The function to read from an output source is designed to work with both PIPEs and strings - don't open the pipes on your main thread because they will block until input is available. So for file readers (e.g. reading log files), it's better to have the child thread open the file and block. However, we spawn the subprocess on the main thread so we can pass the handles for stdin, stdout and stderr to their respective child threads.
Based partially on this Python implementation of multitail.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With