I wish to dump a multiprocessing.Queue
into a list. For that task I've written the following function:
import Queue def dump_queue(queue): """ Empties all pending items in a queue and returns them in a list. """ result = [] # START DEBUG CODE initial_size = queue.qsize() print("Queue has %s items initially." % initial_size) # END DEBUG CODE while True: try: thing = queue.get(block=False) result.append(thing) except Queue.Empty: # START DEBUG CODE current_size = queue.qsize() total_size = current_size + len(result) print("Dumping complete:") if current_size == initial_size: print("No items were added to the queue.") else: print("%s items were added to the queue." % \ (total_size - initial_size)) print("Extracted %s items from the queue, queue has %s items \ left" % (len(result), current_size)) # END DEBUG CODE return result
But for some reason it doesn't work.
Observe the following shell session:
>>> import multiprocessing >>> q = multiprocessing.Queue() >>> for i in range(100): ... q.put([range(200) for j in range(100)]) ... >>> q.qsize() 100 >>> l=dump_queue(q) Queue has 100 items initially. Dumping complete: 0 items were added to the queue. Extracted 1 items from the queue, queue has 99 items left >>> l=dump_queue(q) Queue has 99 items initially. Dumping complete: 0 items were added to the queue. Extracted 3 items from the queue, queue has 96 items left >>> l=dump_queue(q) Queue has 96 items initially. Dumping complete: 0 items were added to the queue. Extracted 1 items from the queue, queue has 95 items left >>>
What's happening here? Why aren't all the items being dumped?
We can print the contents of the multiprocessing using the get() method, empty() method and the print() function. We will check if the multiprocessing queue is empty or not using the empty() method. If the queue is not empty, we will extract an element from the queue using the get() method and print the result.
To clear all items from a queue in Python:queue. clear() . The clear method will remove all elements from the queue.
Queue implementation of queues in Python, one can use 'queue()' method to directly cast/convert the items in the queue to a list/array. Time Complexity: O(n), as the explicit conversions of elements in a queue directly into list/array takes O(n) time.
Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.
Try this:
import Queue import time def dump_queue(queue): """ Empties all pending items in a queue and returns them in a list. """ result = [] for i in iter(queue.get, 'STOP'): result.append(i) time.sleep(.1) return result import multiprocessing q = multiprocessing.Queue() for i in range(100): q.put([range(200) for j in range(100)]) q.put('STOP') l=dump_queue(q) print len(l)
Multiprocessing queues have an internal buffer which has a feeder thread which pulls work off a buffer and flushes it to the pipe. If not all of the objects have been flushed, I could see a case where Empty is raised prematurely. Using a sentinel to indicate the end of the queue is safe (and reliable). Also, using the iter(get, sentinel) idiom is just better than relying on Empty.
I don't like that it could raise empty due to flushing timing (I added the time.sleep(.1) to allow a context switch to the feeder thread, you may not need it, it works without it - it's a habit to release the GIL).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With