Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make Python's multiprocessing Queue's .empty() method return the correct value? Or alternatives?

I have this snippet that uses the Queue class from the multiprocess module. I am very confused that the .empty() method of an instance of Queue does not give me a correct value as i would expect. This is my code:

from time import sleep
from multiprocessing import Queue, Lock

foo = Queue()
locker = Lock()

with locker:  # even with this, still True
    foo.put("bar")

print(foo.empty())  # True, obviously not
print(foo.empty())  # True
print(foo.empty())  # True
print(foo.qsize())  # 1L
print(foo.empty())  # True

However, if i use the sleep function from time, as in cause a chronological delay in the execution. It works.

from time import sleep
from multiprocessing import Queue, Lock

foo = Queue()
locker = Lock()

foo.put("bar")

sleep(0.01)

print(foo.empty())  # False
print(foo.empty())  # False
print(foo.empty())  # False
print(foo.qsize())  # 1L
print(foo.empty())  # False

I know my alternative is the .qsize() > 0 expression, but i am sure that i just doing this in a wrong way.

What am i doing wrong?

*EDIT*

I understand now that is it unreliable, thank you @Mathias Ettinger. Any clean alternatives? I need to know hot to reliably tell if my Queue is empty or not.

like image 816
A. K. Tolentino Avatar asked Sep 04 '15 09:09

A. K. Tolentino


1 Answers

Unfortunately, the Queue's complex implementation that means that .empty() and .qsize() check different things to make their judgments. That means that they may disagree for a while, as you've seen.

Since .qsize() is supported on your platform (which is not true everywhere), you can re-implement the .empty() check in terms of .qsize(), and this will work for you:

# mp.Queue() is a function, not a class, so we need to find the true class
# to subclass
import multiprocessing.queues

class XQueue(multiprocessing.queues.Queue):
    def empty(self):
        try:
            return self.qsize() == 0
        except NotImplementedError:  # OS X -- see qsize() implementation
            return super(XQueue, self).empty()

Under the hood, the Queue .put() is a complex process: the Queue places objects in a buffer and acquires an interprocess semaphore, while a hidden daemon thread is responsible for draining the buffer and serializing its contents to a pipe. (Consumers then .get() by reading from this pipe and releasing the interprocess semaphore.) So, that's why sleeping in your example works: the daemon thread has enough time to move the object from in-memory buffer to I/O representation before you call .empty().

As an aside, I find this behavior surprising: a Queue in the very same internal state can give two different answers to the question, "do you have any elements enqueued?" (qsize will say "yes", and empty "no".)

I think I understand how this came about. Since not all platforms support sem_getvalue(), not all platforms can implement qsize, but empty can be reasonably implemented by just polling the FIFO. I'd have expected empty to be implemented in terms of qsize on platforms that support the latter.

like image 190
pilcrow Avatar answered Oct 05 '22 10:10

pilcrow