Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there an alternative to the threading.Condition variables in python that better support timeouts without polling?

I'm using condition variables in threads that require a timeout. I didn't notice until I saw the CPU usage when having a lot of threads running, that the condition variable provided in the threading module doesn't actually sleep, but polls when a timeout is provided as an argument.

Is there an alternative to this that actually sleeps like pthreads?

Seems painful to have a lot of threads sleeping at multiple second intervals only to have it still eating CPU time.

Thanks!

like image 667
goji Avatar asked Sep 03 '25 08:09

goji


1 Answers

This seems tricky to do in Python, but here is a one solution. It relies on spawning additional threads but doesn't use polling AND ensures that the original thread is woken up as soon as the timeout expires or as soon as the original wait() returns.

Note: The following code includes a test case which tests both the conditional wait ending due to a timeout as well as due to a notification.

from thread import start_new_thread
from threading import Condition, Timer

class ConditionWithoutPolling():
    """Implements wait() with a timeout without polling.  Wraps the Condition
    class."""
    def __init__(self, condition):
        self.condition = condition
        self.wait_timeout_condition = Condition()

    def wait(self, timeout=None):
        """Same as Condition.wait() but it does not use a poll-and-sleep method
        to implement timeouts.  Instead, if a timeout is requested two new
        threads are spawned to implement a non-pol-and-wait method."""
        if timeout is None:
            # just use the original implementation if no waiting is involved
            self.condition.wait()
            return
        else:
            # this new boolean will tell us whether we are done waiting or not
            done = [False]

            # wait on the original condition in a new thread
            start_new_thread(self.wait_on_original, (done,))

            # wait for a timeout (without polling) in a new thread
            Timer(timeout, lambda : self.wait_timed_out(done)).start()

            # wait for EITHER of the previous threads to stop waiting
            with self.wait_timeout_condition:
                while not done[0]:
                    self.wait_timeout_condition.wait()

    def wait_on_original(self, done):
        """Waits on the original Condition and signals wait_is_over when done."""
        self.condition.wait()
        self.wait_is_over(done)

    def wait_timed_out(self, done):
        """Called when the timeout time is reached."""
        # we must re-acquire the lock we were waiting on before we can return
        self.condition.acquire()
        self.wait_is_over(done)

    def wait_is_over(self, done):
        """Modifies done to indicate that the wait is over."""
        done[0] = True
        with self.wait_timeout_condition:
            self.wait_timeout_condition.notify()

    # wrap Condition methods since it wouldn't let us subclass it ...
    def acquire(self, *args):
        self.condition.acquire(*args)
    def release(self):
        self.condition.release()
    def notify(self):
        self.condition.notify()
    def notify_all(self):
        self.condition.notify_all()
    def notifyAll(self):
        self.condition.notifyAll()

def test(wait_timeout, wait_sec_before_notification):
    import time
    from threading import Lock
    lock = Lock()
    cwp = ConditionWithoutPolling(Condition(lock))
    start = time.time()

    def t1():
        with lock:
            print 't1 has the lock, will wait up to %f sec' % (wait_timeout,)
            cwp.wait(wait_timeout)
        time_elapsed = time.time() - start
        print 't1: alive after %f sec' % (time_elapsed,)        

    # this thread will acquire the lock and then conditionally wait for up to 
    # timeout seconds and then print a message 
    start_new_thread(t1, ())

    # wait until it is time to send the notification and then send it
    print 'main thread sleeping (will notify in %f sec)' % (wait_sec_before_notification,)
    time.sleep(wait_sec_before_notification)
    with lock:
        cwp.notifyAll()
        print 'notification sent, will continue in 2sec'
    time.sleep(2.0) # give the other time thread to finish before exiting

if __name__ == "__main__":
    print 'test wait() ending before the timeout ...'
    test(2.0, 1.0)

    print '\ntest wait() ending due to the timeout ...'
    test(2.0, 4.0)
like image 82
David Underhill Avatar answered Sep 04 '25 20:09

David Underhill