I have a program using python's packages multiprocessing and Queue. One of my functions have this structure:
from multiprocessing import Process, Queue
def foo(queue):
while True:
try:
a = queue.get(block = False)
doAndPrintStuff(a)
except:
print "the end"
break
if __name__ == "__main__"
nthreads = 4
queue = Queue.Queue()
# put stuff in the queue here
for stuff in moreStuff:
queue.put(stuff)
procs = [Process(target = foo, args = (queue,)) for i in xrange(nthreads)]
for p in procs:
p.start()
for p in procs:
p.join()
the idea is that when I try to extract from the queue and it is empty, it'll raise an exception and terminate the loop. So I have two questions:
1) is this a safe idiom? Are there better ways to do this?
2) I tried to find what is the exact exception that is raised when I try to .get()
from an empty queue. Currently my program is catching all exceptions, which sucks when the error is somewhere else and I only get a "the end" message.
I tried:
import Queue
queue = Queue.Queue()
[queue.put(x) for x in xrange(10)]
try:
print queue.get(block = False)
except Queue.Empty:
print "end"
break
but I got the error as if I hadn't caught the exception. What's the correct exception to catch?
Simply use q = ClearableQueue() in all places where you used q = Queue() , and call q. clear() when you'd like.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing.
We can check if a process is alive via the multiprocessing. Process. is_alive() method.
The exception should be Queue.Empty
. But are you sure you got the same error? In your second example, you also switched the queue itself from multiprocessing.Queue
to Queue.Queue
, which I think may be the problem.
It might seem strange, but you have to use the multiprocessing.Queue
class, but use the Queue.Empty
exception (which you have to import yourself from the Queue
module)
It appears that the Queue is empty until the put buffers are flushed, which may take a while.
The solution to our problem is to use sentinels, or maybe the built-in task_done() call:
task_done()
Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises a ValueError if called more times than there were items placed in the queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With