Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dumping a multiprocessing.Queue into a list

I wish to dump a multiprocessing.Queue into a list. For that task I've written the following function:

import Queue  def dump_queue(queue):     """     Empties all pending items in a queue and returns them in a list.     """     result = []      # START DEBUG CODE     initial_size = queue.qsize()     print("Queue has %s items initially." % initial_size)     # END DEBUG CODE      while True:         try:             thing = queue.get(block=False)             result.append(thing)         except Queue.Empty:              # START DEBUG CODE             current_size = queue.qsize()             total_size = current_size + len(result)             print("Dumping complete:")             if current_size == initial_size:                 print("No items were added to the queue.")             else:                 print("%s items were added to the queue." % \                       (total_size - initial_size))             print("Extracted %s items from the queue, queue has %s items \             left" % (len(result), current_size))             # END DEBUG CODE              return result 

But for some reason it doesn't work.

Observe the following shell session:

>>> import multiprocessing >>> q = multiprocessing.Queue() >>> for i in range(100): ...     q.put([range(200) for j in range(100)]) ...  >>> q.qsize() 100 >>> l=dump_queue(q) Queue has 100 items initially. Dumping complete: 0 items were added to the queue. Extracted 1 items from the queue, queue has 99 items left >>> l=dump_queue(q) Queue has 99 items initially. Dumping complete: 0 items were added to the queue. Extracted 3 items from the queue, queue has 96 items left >>> l=dump_queue(q) Queue has 96 items initially. Dumping complete: 0 items were added to the queue. Extracted 1 items from the queue, queue has 95 items left >>>  

What's happening here? Why aren't all the items being dumped?

like image 778
Ram Rachum Avatar asked Oct 08 '09 22:10

Ram Rachum


People also ask

How do I print a multiprocessing queue in Python?

We can print the contents of the multiprocessing using the get() method, empty() method and the print() function. We will check if the multiprocessing queue is empty or not using the empty() method. If the queue is not empty, we will extract an element from the queue using the get() method and print the result.

How do you flush a queue in Python?

To clear all items from a queue in Python:queue. clear() . The clear method will remove all elements from the queue.

How do you queue a list in Python?

Queue implementation of queues in Python, one can use 'queue()' method to directly cast/convert the items in the queue to a list/array. Time Complexity: O(n), as the explicit conversions of elements in a queue directly into list/array takes O(n) time.

Is multiprocessing queue thread safe?

Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.


1 Answers

Try this:

import Queue import time  def dump_queue(queue):     """     Empties all pending items in a queue and returns them in a list.     """     result = []      for i in iter(queue.get, 'STOP'):         result.append(i)     time.sleep(.1)     return result  import multiprocessing q = multiprocessing.Queue() for i in range(100):     q.put([range(200) for j in range(100)]) q.put('STOP') l=dump_queue(q) print len(l) 

Multiprocessing queues have an internal buffer which has a feeder thread which pulls work off a buffer and flushes it to the pipe. If not all of the objects have been flushed, I could see a case where Empty is raised prematurely. Using a sentinel to indicate the end of the queue is safe (and reliable). Also, using the iter(get, sentinel) idiom is just better than relying on Empty.

I don't like that it could raise empty due to flushing timing (I added the time.sleep(.1) to allow a context switch to the feeder thread, you may not need it, it works without it - it's a habit to release the GIL).

like image 158
jnoller Avatar answered Sep 20 '22 14:09

jnoller