I'm using a recipe that relies on SIGALRM to set alarm interrupt -- Using module 'subprocess' with timeout
The problem is that I have more than one Python script using signal.ALARM process to set time-outs, and only the latest alarm gets called. What is a good way to improve this multiple Python functions setting time-outs?
The timeout argument is passed to Popen. communicate() . If the timeout expires, the child process will be killed and waited for. The TimeoutExpired exception will be re-raised after the child process has terminated.
Popen is more general than subprocess. call . Popen doesn't block, allowing you to interact with the process while it's running, or continue with other things in your Python program. The call to Popen returns a Popen object.
Then it's as simple as this to timeout a test or any function you like: @timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError def test_base_regression(self): ... Be careful since this does not terminate the function after timeout is reached!
subprocess. Popen takes a cwd argument to set the Current Working Directory; you'll also want to escape your backslashes ( 'd:\\test\\local' ), or use r'd:\test\local' so that the backslashes aren't interpreted as escape sequences by Python. The way you have it written, the \t part will be translated to a tab .
Except for simple, quick hacks, avoid SIGALRM. It's a very old, limited mechanism, not suited to anything more complex: you can only set a single alarm, and it interrupts any system call at the time rather than just the one you intend to interrupt.
It's much cleaner to use a timeout thread to kill the process, for example:
import subprocess, signal, os, threading, errno
from contextlib import contextmanager
class TimeoutThread(object):
def __init__(self, seconds):
self.seconds = seconds
self.cond = threading.Condition()
self.cancelled = False
self.thread = threading.Thread(target=self._wait)
def run(self):
"""Begin the timeout."""
self.thread.start()
def _wait(self):
with self.cond:
self.cond.wait(self.seconds)
if not self.cancelled:
self.timed_out()
def cancel(self):
"""Cancel the timeout, if it hasn't yet occured."""
with self.cond:
self.cancelled = True
self.cond.notify()
self.thread.join()
def timed_out(self):
"""The timeout has expired."""
raise NotImplementedError
class KillProcessThread(TimeoutThread):
def __init__(self, seconds, pid):
super(KillProcessThread, self).__init__(seconds)
self.pid = pid
def timed_out(self):
try:
os.kill(self.pid, signal.SIGKILL)
except OSError as e:
# If the process is already gone, ignore the error.
if e.errno not in (errno.EPERM, errno. ESRCH):
raise e
@contextmanager
def processTimeout(seconds, pid):
timeout = KillProcessThread(seconds, pid)
timeout.run()
try:
yield
finally:
timeout.cancel()
def example():
proc = subprocess.Popen(["sleep", "5"], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
with processTimeout(1, proc.pid):
print proc.communicate()
resultcode = proc.wait()
if resultcode < 0:
print "error: %i" % resultcode
if __name__ == '__main__':
example()
Depending on what you're timing out, you may want to use a lighter signal than SIGKILL to allow the timing-out process to clean up after itself.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With