Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Get output from a python subprocess job into tornado

I've searched a lot and not found how I get the output from a running python subprocess into Tornado. What I want is something like the Travis CI. In a admin page I'll start the job, the server will receive the request and start a subprocess. This subprocess will do some data-mining and feed a string buffer with some logs. I'll get this logs with some ajax with settimeout or a websocket and output this logs into the page. Even if the user close the page and return to it later, there will be the logs and it will normally be updating. Well, is really very similar to Travis.

like image 717
Rubens Pinheiro Avatar asked Oct 21 '22 05:10

Rubens Pinheiro


1 Answers

This blog post shows a way of doing this: http://stefaanlippens.net/python-asynchronous-subprocess-pipe-reading

Essentially the post shows how to prevent deadlock when reading the output of a process by asynchronously reading both stdout and stderr. You can replace the producer command from __main__ to run whatever command you like and the print statements with code to handle the output in Tornado.

Update: I have included the following in case the blog gets taken down:

...what if you want to read standard output and error line by line, for example because you want to monitor a longer running process? On the web you can find many solutions, with varying degrees of complexity, abstraction and dependencies. One solution (with limited code and no dependencies outside the standard library) is to read the pipes in separate threads, so one pipe can't block another.

The code below shows an example implementation. The script is set up in such a way that is used both for the parent as the child process.

For the child process: when called with 'produce' argument, it runs the produce() function that just renders some lines randomly on standard output and standard error. Between the lines there is a touch of delay simulate a longer running process. The parent process (script called without arguments), implemented in the consume() function, invokes the same script in "child mode" as subprocess and monitors its output line by line, without knowing in advance from which pipe each line will come.

The AsynchronousFileReader class is for the threads that will read the standard output and error pipes asynchronously and put each line on a queue. The main thread can then monitor the subprocess by watching the lines as they come in on the queues.

import sys
import subprocess
import random
import time
import threading
import Queue

class AsynchronousFileReader(threading.Thread):
    '''
    Helper class to implement asynchronous reading of a file
    in a separate thread. Pushes read lines on a queue to
    be consumed in another thread.
    '''

    def __init__(self, fd, queue):
        assert isinstance(queue, Queue.Queue)
        assert callable(fd.readline)
        threading.Thread.__init__(self)
        self._fd = fd
        self._queue = queue

    def run(self):
        '''The body of the tread: read lines and put them on the queue.'''
        for line in iter(self._fd.readline, ''):
            self._queue.put(line)

    def eof(self):
        '''Check whether there is no more content to expect.'''
        return not self.is_alive() and self._queue.empty()

def consume(command):
    '''
    Example of how to consume standard output and standard error of
    a subprocess asynchronously without risk on deadlocking.
    '''

    # Launch the command as subprocess.
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Launch the asynchronous readers of the process' stdout and stderr.
    stdout_queue = Queue.Queue()
    stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue)
    stdout_reader.start()
    stderr_queue = Queue.Queue()
    stderr_reader = AsynchronousFileReader(process.stderr, stderr_queue)
    stderr_reader.start()

    # Check the queues if we received some output (until there is nothing more to get).
    while not stdout_reader.eof() or not stderr_reader.eof():
        # Show what we received from standard output.
        while not stdout_queue.empty():
            line = stdout_queue.get()
            print 'Received line on standard output: ' + repr(line)

        # Show what we received from standard error.
        while not stderr_queue.empty():
            line = stderr_queue.get()
            print 'Received line on standard error: ' + repr(line)

        # Sleep a bit before asking the readers again.
        time.sleep(.1)

    # Let's be tidy and join the threads we've started.
    stdout_reader.join()
    stderr_reader.join()

    # Close subprocess' file descriptors.
    process.stdout.close()
    process.stderr.close()

def produce(items=10):
    '''
    Dummy function to randomly render a couple of lines
    on standard output and standard error.
    '''
    for i in range(items):
        output = random.choice([sys.stdout, sys.stderr])
        output.write('Line %d on %s\n' % (i, output))
        output.flush()
        time.sleep(random.uniform(.1, 1))

if __name__ == '__main__':
    # The main flow:
    # if there is an command line argument 'produce', act as a producer
    # otherwise be a consumer (which launches a producer as subprocess).
    if len(sys.argv) == 2 and sys.argv[1] == 'produce':
        produce(10)
    else:
        consume(['python', sys.argv[0], 'produce'])
like image 139
funkotron Avatar answered Oct 27 '22 11:10

funkotron