Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Checking for empty Queue in python's multiprocessing

I have a program using python's packages multiprocessing and Queue. One of my functions have this structure:

from multiprocessing import Process, Queue
def foo(queue):
   while True:
       try:
           a = queue.get(block = False)
           doAndPrintStuff(a)
       except:
           print "the end"
           break

   if __name__ == "__main__"
     nthreads = 4
     queue = Queue.Queue()
     # put stuff in the queue here 
     for stuff in moreStuff:
         queue.put(stuff)
     procs = [Process(target = foo, args = (queue,)) for i in xrange(nthreads)]
     for p in procs:
       p.start()
     for p in procs:
       p.join()

the idea is that when I try to extract from the queue and it is empty, it'll raise an exception and terminate the loop. So I have two questions:

1) is this a safe idiom? Are there better ways to do this?

2) I tried to find what is the exact exception that is raised when I try to .get() from an empty queue. Currently my program is catching all exceptions, which sucks when the error is somewhere else and I only get a "the end" message.

I tried:

  import Queue
  queue = Queue.Queue()
  [queue.put(x) for x in xrange(10)]
  try: 
       print queue.get(block = False)
  except Queue.Empty:
       print "end"
       break

but I got the error as if I hadn't caught the exception. What's the correct exception to catch?

like image 558
Rafael S. Calsaverini Avatar asked Aug 18 '11 14:08

Rafael S. Calsaverini


People also ask

How do you clear a multiprocessing queue in Python?

Simply use q = ClearableQueue() in all places where you used q = Queue() , and call q. clear() when you'd like.

What is queue in Python multiprocessing?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing.

How do I check if a process is running in Python multiprocessing?

We can check if a process is alive via the multiprocessing. Process. is_alive() method.


2 Answers

The exception should be Queue.Empty. But are you sure you got the same error? In your second example, you also switched the queue itself from multiprocessing.Queue to Queue.Queue, which I think may be the problem.

It might seem strange, but you have to use the multiprocessing.Queue class, but use the Queue.Empty exception (which you have to import yourself from the Queue module)

like image 60
Steven Avatar answered Oct 18 '22 14:10

Steven


It appears that the Queue is empty until the put buffers are flushed, which may take a while.

The solution to our problem is to use sentinels, or maybe the built-in task_done() call:

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

like image 21
Cees Timmerman Avatar answered Oct 18 '22 12:10

Cees Timmerman