I have timeout context manager that works perfectly with signals but it raises error in multithread mode because signals work only in main thread.
def timeout_handler(signum, frame):
    raise TimeoutException()
@contextmanager
def timeout(seconds):
    old_handler = signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old_handler)
I've seen decorator implementation of timeout but I don't know how to pass yield inside class derived from threading.Thread. My variant won't work.
@contextmanager
def timelimit(seconds):
    class FuncThread(threading.Thread):
        def run(self):
            yield
    it = FuncThread()        
    it.start()
    it.join(seconds)
    if it.isAlive():
        raise TimeoutException()
                I know it's late but I'm only just reading this, but what about creating your own signaller/context manager? I'm new to python would love feedback from experienced devs this implementation.
This is based off of the answer from "Mr Fooz"
class TimeoutSignaller(Thread):
    def __init__(self, limit, handler):
        Thread.__init__(self)
        self.limit = limit
        self.running = True
        self.handler = handler
        assert callable(handler), "Timeout Handler needs to be a method"
    def run(self):
        timeout_limit = datetime.datetime.now() + datetime.timedelta(seconds=self.limit)
        while self.running:
            if datetime.datetime.now() >= timeout_limit:
                self.handler()
                self.stop_run()
                break
    def stop_run(self):
        self.running = False
class ProcessContextManager:
    def __init__(self, process, seconds=0, minutes=0, hours=0):
        self.seconds = (hours * 3600) + (minutes * 60) + seconds
        self.process = process
        self.signal = TimeoutSignaller(self.seconds, self.signal_handler)
    def __enter__(self):
        self.signal.start()
        return self.process
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.signal.stop_run()
    def signal_handler(self):
        # Make process terminate however you like
        # using self.process reference
        raise TimeoutError("Process took too long to execute")
Use case:
with ProcessContextManager(my_proc) as p:
    # do stuff e.g.
    p.execute()
                        If the code guarded by the context manager is loop-based, consider handling this the way people handle thread killing. Killing another thread is generally unsafe, so the standard approach is to have the controlling thread set a flag that's visible to the worker thread. The worker thread periodically checks that flag and cleanly shuts itself down. Here's how you can do something analogous with timeouts:
class timeout(object):
    def __init__(self, seconds):
        self.seconds = seconds
    def __enter__(self):
        self.die_after = time.time() + self.seconds
        return self
    def __exit__(self, type, value, traceback):
        pass
    @property
    def timed_out(self):
        return time.time() > self.die_after
Here's a single-threaded usage example:
with timeout(1) as t:
    while True: # this will take a long time without a timeout
        # periodically check for timeouts
        if t.timed_out:
            break # or raise an exception
        # do some "useful" work
        print "."
        time.sleep(0.2)
and a multithreaded one:
import thread
def print_for_n_secs(string, seconds):
    with timeout(seconds) as t:
        while True:
            if t.timed_out:
                break # or raise an exception
            print string,
            time.sleep(0.5)
for i in xrange(5):
    thread.start_new_thread(print_for_n_secs,
                            ('thread%d' % (i,), 2))
    time.sleep(0.25)
This approach is more intrusive than using signals but it works for arbitrary threads.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With