I have this snippet that uses the Queue
class from the multiprocess
module. I am very confused that the .empty()
method of an instance of Queue
does not give me a correct value as i would expect. This is my code:
from time import sleep
from multiprocessing import Queue, Lock
foo = Queue()
locker = Lock()
with locker: # even with this, still True
foo.put("bar")
print(foo.empty()) # True, obviously not
print(foo.empty()) # True
print(foo.empty()) # True
print(foo.qsize()) # 1L
print(foo.empty()) # True
However, if i use the sleep
function from time
, as in cause a chronological delay in the execution. It works.
from time import sleep
from multiprocessing import Queue, Lock
foo = Queue()
locker = Lock()
foo.put("bar")
sleep(0.01)
print(foo.empty()) # False
print(foo.empty()) # False
print(foo.empty()) # False
print(foo.qsize()) # 1L
print(foo.empty()) # False
I know my alternative is the .qsize() > 0
expression, but i am sure that i just doing this in a wrong way.
What am i doing wrong?
*EDIT*
I understand now that is it unreliable, thank you @Mathias Ettinger. Any clean alternatives? I need to know hot to reliably tell if my Queue
is empty or not.
Unfortunately, the Queue's complex implementation that means that .empty()
and .qsize()
check different things to make their judgments. That means that they may disagree for a while, as you've seen.
Since .qsize()
is supported on your platform (which is not true everywhere), you can re-implement the .empty()
check in terms of .qsize()
, and this will work for you:
# mp.Queue() is a function, not a class, so we need to find the true class
# to subclass
import multiprocessing.queues
class XQueue(multiprocessing.queues.Queue):
def empty(self):
try:
return self.qsize() == 0
except NotImplementedError: # OS X -- see qsize() implementation
return super(XQueue, self).empty()
Under the hood, the Queue .put()
is a complex process: the Queue places objects in a buffer and acquires an interprocess semaphore, while a hidden daemon thread is responsible for draining the buffer and serializing its contents to a pipe. (Consumers then .get()
by reading from this pipe and releasing the interprocess semaphore.) So, that's why sleeping in your example works: the daemon thread has enough time to move the object from in-memory buffer to I/O representation before you call .empty()
.
As an aside, I find this behavior surprising: a Queue in the very same internal state can give two different answers to the question, "do you have any elements enqueued?" (qsize
will say "yes", and empty
"no".)
I think I understand how this came about. Since not all platforms support sem_getvalue()
, not all platforms can implement qsize
, but empty
can be reasonably implemented by just polling the FIFO. I'd have expected empty
to be implemented in terms of qsize
on platforms that support the latter.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With