Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python threading.Event() - Ensuring all waiting threads wake up on event.set()

I have a number of threads which wait on an event, perform some action, then wait on the event again. Another thread will trigger the event when it's appropriate.

I can't figure out a way to ensure that each waiting thread triggers exactly once upon the event being set. I currently have the triggering thread set it, sleep for a bit, then clear it. Unfortunately, this leads to the waiting threads grabbing the set event many times, or none at all.

I can't simply have the triggering thread spawn the response threads to run them once because they're responses to requests made from elsewhere.

In short: In Python, how can I have a thread set an event and ensure each waiting thread acts on the event exactly once before it gets cleared?

Update:

I've tried setting it up using a lock and a queue, but it doesn't work. Here's what I have:

# Globals - used to synch threads
waitingOnEvent = Queue.Queue
MainEvent = threading.Event()
MainEvent.clear()    # Not sure this is necessary, but figured I'd be safe
mainLock = threading.Lock()

def waitCall():
    mainLock.acquire()
    waitingOnEvent.put("waiting")
    mainLock.release()
    MainEvent.wait()
    waitingOnEvent.get(False)
    waitingOnEvent.task_done()
    #do stuff
    return

def triggerCall():
    mainLock.acquire()
    itemsinq = waitingOnEvent.qsize()
    MainEvent.set()
    waitingOnEvent.join()
    MainEvent.clear()
    mainLock.release()
    return

The first time, itemsinq properly reflects how many calls are waiting, but only the first waiting thread to make the call will make it through. From then on, itemsinq is always 1, and the waiting threads take turns; each time the trigger call happens, the next goes through.

Update 2 It appears as though only one of the event.wait() threads is waking up, and yet the queue.join() is working. This suggests to me that one waiting thread wakes up, grabs from the queue and calls task_done(), and that single get()/task_done() somehow empties the queue and allows the join(). The trigger thread then completes the join(), clears the event, and thus prevents the other waiting threads from going through. Why would the queue register as empty/finished after only one get/task_done call, though?

Only one seems to be waking up, even if I comment out the queue.get() and queue.task_done() and hang the trigger so it can't clear the event.

like image 348
culhantr Avatar asked Aug 04 '10 20:08

culhantr


People also ask

What is the method that wakes up all threads waiting for the condition in Python?

notifyAll() Method: Here the notifyAll() method is used to send notifications for all waiting threads. If all threads are waiting for condition object updating then notifyAll() will use.

Does Python wait for all threads to finish?

As in most programming languages, there are threads in Python too. Code executes sequentially, meaning that every function waits for the previous function to complete before it can execute.

How do you make a thread wait for another thread in Python?

We can wait for a result using sleep. Specifically, the waiting thread can call the time. sleep() function and specify a number of seconds to wait. The thread will then block until the number of seconds has elapsed, before checking whether the new thread has completed and returned a result.

What is threading event Python?

A threading. Event object wraps a boolean variable that can either be “set” (True) or “not set” (False). Threads sharing the event instance can check if the event is set, set the event, clear the event (make it not set), or wait for the event to be set.


1 Answers

You don't need an Event, and you don't need both a Lock and a Queue. All you need is a Queue.

Call queue.put to drop a message in without waiting for it to be delivered or processed.

Call queue.get in the worker thread to wait for a message to arrive.

import threading
import Queue

active_queues = []

class Worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.mailbox = Queue.Queue()
        active_queues.append(self.mailbox)

    def run(self):
        while True:
            data = self.mailbox.get()
            if data == 'shutdown':
                print self, 'shutting down'
                return
            print self, 'received a message:', data

    def stop(self):
        active_queues.remove(self.mailbox)
        self.mailbox.put("shutdown")
        self.join()


def broadcast_event(data):
    for q in active_queues:
        q.put(data)

t1 = Worker()
t2 = Worker()
t1.start()
t2.start()
broadcast_event("first event")
broadcast_event("second event")
broadcast_event("shutdown")

t1.stop()
t2.stop()

The messages don't have to be strings; they can be any Python object.

like image 53
Jason Orendorff Avatar answered Nov 15 '22 18:11

Jason Orendorff