Straight from the Python docs:
class multiprocessing.Queue([maxsize])
...
qsize() Return the approximate size of the queue. Because of multithreading/multiprocessing semantics, this number is not reliable.
empty() Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.
And I've empirically found this to be quite true for Queue
, especially for empty()
.
In my code I have a bunch of processes (each a child of the same master process) and each having the following in their run
method:
while self.active:
if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0):
try:
self.exclusive_queue.put(self.general_queue.get(timeout=self.queue_timeout))
except Queue.Empty as empty_queue:
continue
else:
task = self.exclusive_queue.get()
self.compute(task)
Basically, the process waits on the general_queue
for work, but first checks its exclusive_queue
. The master process can put tasks in either the general or in the exclusive queue of a process. Now, in the if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0)
, I first used a self.exclusive_queue.empty()
which resulted in quite a weird behaviour (qsize()
of 30+ and empty() = True
).
So where I'm heading with this is - for multiprocessing.queues.SimpleQueue
in the docs is written:
empty() Return True if the queue is empty, False otherwise.
with no mention of reliability at all. Is SimpleQueue.empty() reliable?
And second is multiprocessing.JoinableQueue
reliable or "more" reliable than Queue
because of the task_done()
mechanism?
Can such an approach be considered correct or perhaps an approach with callbacks (via a shared pipe endpoint between the children) be more appropriate?
Not a direct answer but I've started to rely more and more on iterating the input queue with a guarding condition. There is an example in the documentation of the multiprocessing module:
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
So when your input to the queue is complete, you simply put
as many STOP
strings, or whatever guard you choose, to the queue as you have started processes .
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With