Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to wait until a multithread queue is not empty without wasting too much cpu cycles

I want to make a thread wait until a multithread queue is not empty. The queue has only one producer and one consumer. The producer places tasks in the queue when available but the producer has to wait until two or more tasks have been gathered. The reason why I don't just use the get method twice in order to retrieve two tasks is because it over complicates the flow of the algorithm. That cannot be depicted in the snippet bellow though, because obviously it's just an oversimplified example.

I need to know that the queue is not empty so that I can compare the peak of the queue (without removing it) with the element I just removed with get

How it could be done with sleep:

while myQueue.empty():
    sleep(0.05)

How can I do that without using sleep? Should I use event.wait()? If yes, I cannot figure out how I should properly use the event.clear() command. Since the thread that I want to make wait is also the consumer and I cannot be sure whether the queue is empty. Even if I use queue.empty() to check.

like image 810
LetsPlayYahtzee Avatar asked Dec 15 '22 08:12

LetsPlayYahtzee


2 Answers

Essentially, it seems you need to implement the Queue.peek() method, that would return the next element in the queue without actually removing it.

This method is not available in the standard Queue object, but you can inherit and expand it without problems:

from Queue import Queue
class VoyeurQueue(Queue):
    def peek(self, block=True, timeout=None):
        # ...

Now for the contents of new peek() method, you can simply copy-paste the contents of get() method of the base Queue object with some modifications. You can find it at /usr/lib/python?.?/Queue.py if you're on Linux, or %PYTHONPATH%/lib/Queue.py if you're on Windows (not sure about the latter as I'm currently on Linux machine and cannot check). In my copy of Python 2.7, the get() method is implemented as:

def get(self, block=True, timeout=None):
    # ... lots of comments
    self.not_empty.acquire()
    try:
        if not block:
            if not self._qsize():
                raise Empty
        elif timeout is None:
            while not self._qsize():
                self.not_empty.wait()
        elif timeout < 0:
            raise ValueError("'timeout' must be a non-negative number")
        else:
            endtime = _time() + timeout
            while not self._qsize():
                remaining = endtime - _time()
                if remaining <= 0.0:
                    raise Empty
                self.not_empty.wait(remaining)
        item = self._get()
        self.not_full.notify()
        return item
    finally:
        self.not_empty.release()

def _get(self):
    return self.queue.popleft()

Now, for differences. You don't want to remove the element, so instead of _get() we define the following:

def _peek(self):
    return self.queue[0]

And in the peek() method, we still use the self.not_empty Condition but we no longer need the self.not_full.notify(). So the resulting code will look like:

from Queue import Queue

class VoyeurQueue(Queue):

    def peek(self, block=True, timeout=None):
        self.not_empty.acquire()
        try:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = _time() + timeout
                while not self._qsize():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._peek()
            return item
        finally:
            self.not_empty.release()

    def _peek(self):
        return self.queue[0]
like image 149
Lav Avatar answered May 03 '23 22:05

Lav


You can use a semaphore, initialized at zero, in parallel to the queue. Let say for example mySemaphore = threading.Semaphore(0). By default the thread calling mySempahore.acquire() will be blocked as the semaphore is zero without touching the queue. Then when you put someting in the queue, you can call mySemaphore.release() that will allow one thread to execute (util the next loop is suppose).

like image 37
Antoine Avatar answered May 03 '23 23:05

Antoine