Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python subprocess with timeout and large output (>64K)

I want to execute a process, limit the execution-time by some timeout in seconds and grab the output produced by the process. And I want to do this on windows, linux and freebsd.

I have tried implementing this in three different ways:

  1. cmd - Without timeout and subprocess.PIPE for output capture.

    BEHAVIOUR: Operates as expected but does not support timeout, i need timeout...

  2. cmd_to - With timeout and subprocess.PIPE for output capture.

    BEHAVIOUR: Blocks subprocess execution when output >= 2^16 bytes.

  3. cmd_totf - With timeout and tempfile.NamedTemporaryfile for output capture.

    BEHAVIOUR: Operates as expected but uses temporary files on disk.

These are available below for closer inspection.

As can be seen in the output below, then the timeout-code blocks the execution of the sub-process when using subprocessing.PIPE and output from the subprocess is >= 2^16 bytes.

The subprocess documentation states that this is expected when calling process.wait() and using subprocessing.PIPE, however no warnings are given when using process.poll(), so what is going wrong here?

I have a solution in cmd_totf which use the tempfile module but the tradeoff is that it writes the output to disk, something I would REALLY like to avoid.

So my questions are:

  • What am I doing wrong in cmd_to?
  • Is there a way to do what I want and without using tempfiles / keeping the output in memory.

Script to generate a bunch of output ('exp_gen.py'):

#!/usr/bin/env python
import sys
output  = "b"*int(sys.argv[1])
print output

Three different implementations (cmd, cmd_to, cmd_totf) of wrappers around subprocessing.Popen:

#!/usr/bin/env python
import subprocess, time, tempfile
bufsize = -1

def cmd(cmdline, timeout=60):
  """
  Execute cmdline.
  Uses subprocessing and subprocess.PIPE.
  """

  p = subprocess.Popen(
    cmdline,
    bufsize = bufsize,
    shell   = False,
    stdin   = subprocess.PIPE,
    stdout  = subprocess.PIPE,
    stderr  = subprocess.PIPE
  )

  out, err    = p.communicate()
  returncode  = p.returncode

  return (returncode, err, out)

def cmd_to(cmdline, timeout=60):
  """
  Execute cmdline, limit execution time to 'timeout' seconds.
  Uses subprocessing and subprocess.PIPE.
  """

  p = subprocess.Popen(
    cmdline,
    bufsize = bufsize,
    shell   = False,
    stdin   = subprocess.PIPE,
    stdout  = subprocess.PIPE,
    stderr  = subprocess.PIPE
  )

  t_begin         = time.time()             # Monitor execution time
  seconds_passed  = 0  

  while p.poll() is None and seconds_passed < timeout:
    seconds_passed = time.time() - t_begin
    time.sleep(0.1)

  #if seconds_passed > timeout:
  #
  #  try:
  #    p.stdout.close()  # If they are not closed the fds will hang around until
  #    p.stderr.close()  # os.fdlimit is exceeded and cause a nasty exception
  #    p.terminate()     # Important to close the fds prior to terminating the process!
  #                      # NOTE: Are there any other "non-freed" resources?
  #  except:
  #    pass
  #  
  #  raise TimeoutInterrupt

  out, err    = p.communicate()
  returncode  = p.returncode

  return (returncode, err, out)

def cmd_totf(cmdline, timeout=60):
  """
  Execute cmdline, limit execution time to 'timeout' seconds.
  Uses subprocessing and tempfile instead of subprocessing.PIPE.
  """

  output  = tempfile.NamedTemporaryFile(delete=False)
  error   = tempfile.NamedTemporaryFile(delete=False)

  p = subprocess.Popen(
    cmdline,
    bufsize = 0,
    shell   = False,
    stdin   = None,
    stdout  = output,
    stderr  = error
  )

  t_begin         = time.time()             # Monitor execution time
  seconds_passed  = 0  

  while p.poll() is None and seconds_passed < timeout:
    seconds_passed = time.time() - t_begin
    time.sleep(0.1)

  #if seconds_passed > timeout:
  #
  #  try:
  #    p.stdout.close()  # If they are not closed the fds will hang around until
  #    p.stderr.close()  # os.fdlimit is exceeded and cause a nasty exception
  #    p.terminate()     # Important to close the fds prior to terminating the process!
  #                      # NOTE: Are there any other "non-freed" resources?
  #  except:
  #    pass
  #  
  #  raise TimeoutInterrupt

  p.wait()

  returncode  = p.returncode

  fd          = open(output.name)
  out         = fd.read()
  fd.close()

  fd  = open(error.name)
  err = fd.read()
  fd.close()

  error.close()
  output.close()

  return (returncode, err, out)

if __name__ == "__main__":

  implementations = [cmd, cmd_to, cmd_totf]
  bytes     = ['65535', '65536', str(1024*1024)]
  timeouts  = [5]

  for timeout in timeouts:    
    for size in bytes:    
      for i in implementations:
        t_begin         = time.time()
        seconds_passed  = 0        
        rc, err, output = i(['exp_gen.py', size], timeout)
        seconds_passed = time.time() - t_begin
        filler = ' '*(8-len(i.func_name))
        print "[%s%s:  timeout=%d,  iosize=%s,  seconds=%f]" % (repr(i.func_name), filler, timeout, size, seconds_passed)

Output from execution:

['cmd'     :  timeout=5,  iosize=65535,  seconds=0.016447]
['cmd_to'  :  timeout=5,  iosize=65535,  seconds=0.103022]
['cmd_totf':  timeout=5,  iosize=65535,  seconds=0.107176]
['cmd'     :  timeout=5,  iosize=65536,  seconds=0.028105]
['cmd_to'  :  timeout=5,  iosize=65536,  seconds=5.116658]
['cmd_totf':  timeout=5,  iosize=65536,  seconds=0.104905]
['cmd'     :  timeout=5,  iosize=1048576,  seconds=0.025964]
['cmd_to'  :  timeout=5,  iosize=1048576,  seconds=5.128062]
['cmd_totf':  timeout=5,  iosize=1048576,  seconds=0.103183]
like image 603
safl Avatar asked Aug 26 '10 13:08

safl


2 Answers

As opposed to all the warnings in the subprocess documentation then directly reading from process.stdout and process.stderr has provided a better solution.

By better I mean that I can read output from a process that exceeds 2^16 bytes without having to temporarily store the output on disk.

The code follows:

import fcntl
import os
import subprocess
import time

def nonBlockRead(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ''

def cmd(cmdline, timeout=60):
    """
    Execute cmdline, limit execution time to 'timeout' seconds.
    Uses the subprocess module and subprocess.PIPE.

    Raises TimeoutInterrupt
    """

    p = subprocess.Popen(
        cmdline,
        bufsize = bufsize, # default value of 0 (unbuffered) is best
        shell   = False, # not really needed; it's disabled by default
        stdout  = subprocess.PIPE,
        stderr  = subprocess.PIPE
    )

    t_begin = time.time() # Monitor execution time
    seconds_passed = 0

    stdout = ''
    stderr = ''

    while p.poll() is None and seconds_passed < timeout: # Monitor process
        time.sleep(0.1) # Wait a little
        seconds_passed = time.time() - t_begin

        # p.std* blocks on read(), which messes up the timeout timer.
        # To fix this, we use a nonblocking read()
        # Note: Not sure if this is Windows compatible
        stdout += nonBlockRead(p.stdout)
        stderr += nonBlockRead(p.stderr)

    if seconds_passed >= timeout:
        try:
            p.stdout.close()  # If they are not closed the fds will hang around until
            p.stderr.close()  # os.fdlimit is exceeded and cause a nasty exception
            p.terminate()     # Important to close the fds prior to terminating the process!
                              # NOTE: Are there any other "non-freed" resources?
        except:
            pass

        raise TimeoutInterrupt

    returncode  = p.returncode

    return (returncode, stdout, stderr)
like image 56
safl Avatar answered Oct 23 '22 00:10

safl


Disclaimer: This answer is not tested on windows, nor freebsd. But the used modules should work on these systems. I believe this should be a working answer to your question - it works for me.

Here's code I just hacked to solve the problem on linux. It is a combination of several Stackoverflow threads and my own research in the Python 3 documents.

Main characteristics of this code:

  • Uses processes not threads for blocking I/O because they can more reliably be p.terminated()
  • Implements a retriggerable timeout watchdog that restarts counting whenever some output happens
  • Implements a long-term timeout watchdog to limit overall runtime
  • Can feed in stdin (although I only need to feed in one-time short strings)
  • Can capture stdout/stderr in the usual Popen means (Only stdout is coded, and stderr redirected to stdout; but can easily be separated)
  • It's almost realtime because it only checks every 0.2 seconds for output. But you could decrease this or remove the waiting interval easily
  • Lots of debugging printouts still enabled to see whats happening when.

The only code dependency is enum as implemented here, but the code could easily be changed to work without. It's only used to distinguish the two timeouts - use separate exceptions if you like.

Here's the code - as usual - feedback is highly appreciated: (Edit 29-Jun-2012 - the code is now actually working)

# Python module runcmd
# Implements a class to launch shell commands which
# are killed after a timeout. Timeouts can be reset
# after each line of output
#
# Use inside other script with:
#
# import runcmd
# (return_code, out) = runcmd.RunCmd(['ls', '-l', '/etc'],
#                                    timeout_runtime,
#                                    timeout_no_output,
#                                    stdin_string).go()
#

import multiprocessing
import queue
import subprocess
import time

import enum

def timestamp():
    return time.strftime('%Y%m%d-%H%M%S')


class ErrorRunCmd(Exception): pass
class ErrorRunCmdTimeOut(ErrorRunCmd): pass

class Enqueue_output(multiprocessing.Process):
    def __init__(self, out, queue):
        multiprocessing.Process.__init__(self)
        self.out = out
        self.queue = queue
        self.daemon = True
    def run(self):
        try:
            for line in iter(self.out.readline, b''):
                #print('worker read:', line)
                self.queue.put(line)
        except ValueError: pass # Readline of closed file
        self.out.close()
class Enqueue_input(multiprocessing.Process):
    def __init__(self, inp, iterable):
        multiprocessing.Process.__init__(self)
        self.inp = inp
        self.iterable = iterable
        self.daemon = True
    def run(self):
        #print("writing stdin")
        for line in self.iterable:
            self.inp.write(bytes(line,'utf-8'))
        self.inp.close()
        #print("writing stdin DONE")

class RunCmd():
    """RunCmd - class to launch shell commands

    Captures and returns stdout. Kills child after a given
    amount (timeout_runtime) wallclock seconds. Can also
    kill after timeout_retriggerable wallclock seconds.
    This second timer is reset whenever the child does some
    output

       (return_code, out) = RunCmd(['ls', '-l', '/etc'],
                                   timeout_runtime,
                                   timeout_no_output,
                                   stdin_string).go()

    """
    Timeout = enum.Enum('No','Retriggerable','Runtime')

    def __init__(self, cmd, timeout_runtime, timeout_retriggerable, stdin=None):
        self.dbg = False
        self.cmd = cmd
        self.timeout_retriggerable = timeout_retriggerable
        self.timeout_runtime = timeout_runtime
        self.timeout_hit = self.Timeout.No
        self.stdout = '--Cmd did not yield any output--'
        self.stdin = stdin
    def read_queue(self, q):
        time_last_output = None
        try:
            bstr = q.get(False) # non-blocking
            if self.dbg: print('{} chars read'.format(len(bstr)))
            time_last_output = time.time()
            self.stdout += bstr
        except queue.Empty:
            #print('queue empty')
            pass
        return time_last_output
    def go(self):
        if self.stdin:
            pstdin = subprocess.PIPE
        else:
            pstdin = None
        p = subprocess.Popen(self.cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=pstdin)
        pin = None
        if (pstdin):
            pin = Enqueue_input(p.stdin, [self.stdin + '\n'])
            pin.start()
        q = multiprocessing.Queue()
        pout = Enqueue_output(p.stdout, q)
        pout.start()
        try:
            if self.dbg: print('Beginning subprocess with timeout {}/{} s on {}'.format(self.timeout_retriggerable, self.timeout_runtime, time.asctime()))
            time_begin = time.time()
            time_last_output = time_begin
            seconds_passed = 0
            self.stdout = b''
            once = True                 # ensure loop's executed at least once
                                        # some child cmds may exit very fast, but still produce output
            while once or p.poll() is None or not q.empty():
                once = False
                if self.dbg: print('a) {} of {}/{} secs passed and overall {} chars read'.format(seconds_passed, self.timeout_retriggerable, self.timeout_runtime, len(self.stdout)))

                tlo = self.read_queue(q)
                if tlo:
                    time_last_output = tlo

                now = time.time()
                if now - time_last_output >= self.timeout_retriggerable:
                    self.timeout_hit = self.Timeout.Retriggerable
                    raise ErrorRunCmdTimeOut(self)
                if now - time_begin >= self.timeout_runtime:
                    self.timeout_hit = self.Timeout.Runtime
                    raise ErrorRunCmdTimeOut(self)

                if q.empty():
                    time.sleep(0.1)
            # Final try to get "last-millisecond" output
            self.read_queue(q)            
        finally:
            self._close(p, [pout, pin])            
        return (self.returncode, self.stdout)               

    def _close(self, p, procs):
        if self.dbg:
            if self.timeout_hit != self.Timeout.No:
                print('{} A TIMEOUT occured: {}'.format(timestamp(), self.timeout_hit))
            else:
                print('{} No timeout occured'.format(timestamp()))
        for process in [proc for proc in procs if proc]:
            try:
                process.terminate()
            except:
                print('{} Process termination raised trouble'.format(timestamp()))
                raise
        try:
            p.stdin.close()
        except: pass
        if self.dbg: print('{} _closed stdin'.format(timestamp()))
        try:
            p.stdout.close()    # If they are not closed the fds will hang around until
        except: pass
        if self.dbg: print('{} _closed stdout'.format(timestamp()))
            #p.stderr.close()   # os.fdlimit is exceeded and cause a nasty exception
        try:
            p.terminate()       # Important to close the fds prior to terminating the process!
                                # NOTE: Are there any other "non-freed" resources?
        except: pass
        if self.dbg: print('{} _closed Popen'.format(timestamp()))
        try:
            self.stdout = self.stdout.decode('utf-8')
        except: pass
        self.returncode = p.returncode
        if self.dbg: print('{} _closed all'.format(timestamp()))

Use with:

import runcmd

cmd = ['ls', '-l', '/etc']

worker = runcmd.RunCmd(cmd,
                       40,    # limit runtime [wallclock seconds]
                       2,     # limit runtime after last output [wallclk secs]
                       ''     # stdin input string
                       )
(return_code, out) = worker.go()

if worker.timeout_hit != worker.Timeout.No:
    print('A TIMEOUT occured: {}'.format(worker.timeout_hit))
else:
    print('No timeout occured')


print("Running '{:s}' returned {:d} and {:d} chars of output".format(cmd, return_code, len(out)))
print('Output:')
print(out)

command - the first argument - should be a list of a command and its arguments. It is used for the Popen(shell=False) call and its timeouts are in seconds. There's currently no code to disable the timeouts. Set timeout_no_output to time_runtime to effectively disable the retriggerable timeout_no_output. stdin_string can be any string which is to be sent to the command's standard input. Set to None if your command does not need any input. If a string is provided, a final '\n' is appended.

like image 29
cfi Avatar answered Oct 23 '22 02:10

cfi