Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing Queues reliability, Queue vs SimpleQueue vs JoinableQueue

Straight from the Python docs:

class multiprocessing.Queue([maxsize])

...

qsize() Return the approximate size of the queue. Because of multithreading/multiprocessing semantics, this number is not reliable.

empty() Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.

And I've empirically found this to be quite true for Queue, especially for empty().

In my code I have a bunch of processes (each a child of the same master process) and each having the following in their run method:

while self.active:
    if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0):
        try:
            self.exclusive_queue.put(self.general_queue.get(timeout=self.queue_timeout))
        except Queue.Empty as empty_queue:
            continue
    else:
        task = self.exclusive_queue.get()
        self.compute(task)

Basically, the process waits on the general_queue for work, but first checks its exclusive_queue. The master process can put tasks in either the general or in the exclusive queue of a process. Now, in the if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0), I first used a self.exclusive_queue.empty() which resulted in quite a weird behaviour (qsize() of 30+ and empty() = True).

So where I'm heading with this is - for multiprocessing.queues.SimpleQueue in the docs is written:

empty() Return True if the queue is empty, False otherwise.

with no mention of reliability at all. Is SimpleQueue.empty() reliable?

And second is multiprocessing.JoinableQueue reliable or "more" reliable than Queue because of the task_done() mechanism?

Can such an approach be considered correct or perhaps an approach with callbacks (via a shared pipe endpoint between the children) be more appropriate?

like image 605
dmg Avatar asked Feb 07 '13 15:02

dmg


1 Answers

Not a direct answer but I've started to rely more and more on iterating the input queue with a guarding condition. There is an example in the documentation of the multiprocessing module:

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

So when your input to the queue is complete, you simply put as many STOP strings, or whatever guard you choose, to the queue as you have started processes .

like image 131
Midnighter Avatar answered Sep 20 '22 11:09

Midnighter