Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: Feed and parse stream of data to and from external program with additional input and output files

The problem: I have a poorly designed Fortran program (I cannot change it, I'm stuck with it) which takes text input from stdin and other input files, and writes text output results to stdout and other output files. The size of input and out is quite large, and I would like to avoid writing to the hard drive (slow operation). I have written a function that iterates over the lines of the several input files, and I also have parsers for multiple output. I don't really know if the program first read all the input and then starts to output, or starts outputting while reading the input.

The goal: To have a function that feeds the external program with what it wants, and parses the output as it comes from the program, without writing data to text files on the hard drive.

Research: The naive way using files is:

from subprocess import PIPE, Popen

def execute_simple(cmd, stdin_iter, stdout_parser, input_files, output_files):

    for filename, file_iter in input_files.iteritems():
        with open(filename ,'w') as f:
            for line in file_iter:
                f.write(line + '\n')


    p_sub = Popen(
        shlex.split(cmd),
        stdin = PIPE,
        stdout = open('stdout.txt', 'w'),
        stderr = open('stderr.txt', 'w'),
        bufsize=1
    )
    for line in stdin_iter:
        p_sub.stdin.write(line + '\n')

    p_sub.stdin.close()
    p_sub.wait()

    data = {}
    for filename, parse_func in output_files.iteritems():
        # The stdout.txt and stderr.txt is included here
        with open(filename,'r') as f:
            data[filename] = parse_func(
                    iter(f.readline, b'')
            )
    return data

I have tried to and the subprocess module to execute the external program together. The additional input/output files are handled with named pipes and multiprocessing. I want to feed stdin with an iterator (which returns the lines for input), save the stderr in a list, and parse the stdout as it comes from the external program. The input and output can be quite large, so using communicate is not feasible.

I have a parser on the format:

def parser(iterator):
    for line in iterator:
        # Do something
        if condition:
            break
    some_other_function(iterator)
    return data

I looked at this solution using select to choose the appropriate stream, however I don't know how to make it work with my stdout parser and how to feed the stdin.

I also look the asyncio module, but as I can see I will have the same problem with the parsing of stout.

like image 725
hakanc Avatar asked Jul 27 '15 13:07

hakanc


1 Answers

You should use named pipes for all input and output to the Fortran program to avoid writing to disk. Then, in your consumer, you can use threads to read from each of the program's output sources and add the information to a Queue for in-order processing.

To model this, I created a python app daemon.py that reads from standard input and returns the square root until EOF. It logs all input to a log file specified as a command-line argument and prints the square root to stdout and all errors to stderr. I think it simulates your program (of course the number of output files is only one, but it can be scaled). You can view the source code for this test application here. Note the explicit call to stdout.flush(). By default, the standard output is print buffered, which means that this is output at the end and messages will not arrive in order. I hope your Fortran application does not buffer its output. I believe that my sample application will probably not run on Windows, due to a Unix-only use of select, which shouldn't matter in your case.

I have my consumer application which starts the daemon application as a subprocess, with stdin, stdout and stderr redirected to subprocess.PIPEs. each of these pipes is given to a different thread, one to give input, and three to handle the log file, errors and standard output respectively. They all add their messages to a shared Queue which your main thread reads from and sends to your parser.

This is my consumer's code:

import os, random, time
import subprocess
import threading
import Queue
import atexit

def setup():
    # make a named pipe for every file the program should write
    logfilepipe='logpipe'
    os.mkfifo(logfilepipe)

def cleanup():
    # put your named pipes here to get cleaned up
    logfilepipe='logpipe'
    os.remove(logfilepipe)

# run our cleanup code no matter what - avoid leaving pipes laying around
# even if we terminate early with Ctrl-C
atexit.register(cleanup)

# My example iterator that supplies input for the program. You already have an iterator 
# so don't worry about this. It just returns a random input from the sample_data list
# until the maximum number of iterations is reached.
class MyIter():
    sample_data=[0,1,2,4,9,-100,16,25,100,-8,'seven',10000,144,8,47,91,2.4,'^',56,18,77,94]
    def __init__(self, numiterations=1000):
        self.numiterations=numiterations
        self.current = 0

    def __iter__(self):
        return self

    def next(self):
        self.current += 1
        if self.current > self.numiterations:
            raise StopIteration
        else:
            return random.choice(self.__class__.sample_data)

# Your parse_func function - I just print it out with a [tag] showing its source.
def parse_func(source,line):
    print "[%s] %s" % (source,line)

# Generic function for sending standard input to the problem.
# p - a process handle returned by subprocess
def input_func(p, queue):
    # run the command with output redirected
    for line in MyIter(30): # Limit for testing purposes
        time.sleep(0.1) # sleep a tiny bit
        p.stdin.write(str(line)+'\n')
        queue.put(('INPUT', line))
    p.stdin.close()
    p.wait()

    # Once our process has ended, tell the main thread to quit
    queue.put(('QUIT', True))

# Generic function for reading output from the program. source can either be a
# named pipe identified by a string, or subprocess.PIPE for stdout and stderr.
def read_output(source, queue, tag=None):
    print "Starting to read output for %r" % source
    if isinstance(source,str):
        # Is a file or named pipe, so open it
        source=open(source, 'r') # open file with string name
    line = source.readline()
    # enqueue and read lines until EOF
    while line != '':
        queue.put((tag, line.rstrip()))
        line = source.readline()

if __name__=='__main__':
    cmd='daemon.py'

    # set up our FIFOs instead of using files - put file names into setup() and cleanup()
    setup()

    logfilepipe='logpipe'

    # Message queue for handling all output, whether it's stdout, stderr, or a file output by our command
    lq = Queue.Queue()

    # open the subprocess for command
    print "Running command."
    p = subprocess.Popen(['/path/to/'+cmd,logfilepipe],
                                    stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Start threads to handle the input and output
    threading.Thread(target=input_func, args=(p, lq)).start()
    threading.Thread(target=read_output, args=(p.stdout, lq, 'OUTPUT')).start()
    threading.Thread(target=read_output, args=(p.stderr, lq, 'ERRORS')).start()

    # open a thread to read any other output files (e.g. log file) as named pipes
    threading.Thread(target=read_output, args=(logfilepipe, lq, 'LOG')).start()

    # Now combine the results from our threads to do what you want
    run=True
    while(run):
        (tag, line) = lq.get()
        if tag == 'QUIT':
            run=False
        else:
            parse_func(tag, line)

My iterator returns a random input value (some of which are junk to cause errors). Yours should be a drop-in replacement. The program will run until the end of its input and then wait for the subprocess to complete before enqueueing a QUIT message to your main thread. My parse_func is obviously super simple, simply printing out the message and its source, but you should be able to work with something. The function to read from an output source is designed to work with both PIPEs and strings - don't open the pipes on your main thread because they will block until input is available. So for file readers (e.g. reading log files), it's better to have the child thread open the file and block. However, we spawn the subprocess on the main thread so we can pass the handles for stdin, stdout and stderr to their respective child threads.

Based partially on this Python implementation of multitail.

like image 141
Aaron D Avatar answered Nov 20 '22 07:11

Aaron D