Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Clean way to get near-LIFO behavior from multiprocessing.Queue? (or even just *not* near-FIFO)

Does anyone know a clean way to get near-LIFO or even not near-FIFO (e.g. random) behavior from multiprocessing.Queue?

Alternative Question: Could someone point me to the code for the thread that manages the actual storage structure behind multiprocessing.Queue? It seems like it would be trivial within that to provide approximately LIFO access, but I got lost in the rabbit hole trying to find it.

Notes:

  1. I believe multiprocessing.Queue does not guarantee order. Fine. But it is near-FIFO so near-LIFO would be great.
  2. I could pull all the current items off the queue and reverse the order before working with them, but I prefer to avoid a kludge if possible.

(edit) To clarify: I am doing a CPU bound simulation with multiprocessing and so can't use the specialized queues from Queue. Since I haven't seen any answers for a few days, I've added the alternative question above.


In case it is an issue, below is slight evidence that multiprocessing.Queue is near-FIFO. It just shows that in a simple case (a single thread), it is perfectly FIFO on my system:

import multiprocessing as mp
import Queue

q = mp.Queue()

for i in xrange(1000):
    q.put(i)

deltas = []
while True:
    try:
        value1 = q.get(timeout=0.1)
        value2 = q.get(timeout=0.1)
        deltas.append(value2-value1)
    except Queue.Empty:
        break

#positive deltas would indicate the numbers are coming out in increasing order
min_delta, max_delta = min(deltas), max(deltas)
avg_delta = sum(deltas)/len(deltas)

print "min", min_delta
print "max", max_delta
print "avg", avg_delta

prints: min, max, and average are exactly 1 (perfect FIFO)

like image 353
KobeJohn Avatar asked Aug 20 '12 17:08

KobeJohn


2 Answers

I've looked over the Queue class that lives in Lib/multiprocessing/queues.py in my Python installation (Python 2.7, but nothing obvious is different in the version from Python 3.2 that I briefly checked). Here's how I understand it works:

There are two sets of objects that are maintained by the Queue object. One set are multiprocess-safe primatives that are shared by all processes. The others are created and used separately by each process.

The cross-process objects are set up in the __init__ method:

  1. A Pipe object, who's two ends are saved as self._reader and self._writer.
  2. A BoundedSemaphore object, which counts (and optionally limits) how many objects are in the queue.
  3. A Lock object for reading the Pipe, and on non-Windows platforms another for writing. (I assume that this is because writing to a pipe is inherently multiprocess-safe on Windows.)

The per-process objects are set up in the _after_fork and _start_thread methods:

  1. A collections.deque object used to buffer writes to the Pipe.
  2. A threading.condition object used to signal when the buffer is not empty.
  3. A threading.Thread object that does the actual writing. It is created lazily, so it won't exist until at least one write to the Queue has been requested in a given process.
  4. Various Finalize objects that clean stuff up when the process ends.

A get from the queue is pretty simple. You acquire the read lock, decrement the semaphore, and grab an object from the read end of the Pipe.

A put is more complicated. It uses multiple threads. The caller to put grabs the condition's lock, then adds its object to the buffer and signals the condition before unlocking it. It also increments the semaphore and starts up the writer thread if it isn't running yet.

The writer thread loops forever (until canceled) in the _feed method. If the buffer is empty, it waits on the notempty condition. Then it takes an item from the buffer, acquires the write lock (if it exists) and writes the item to the Pipe.


So, given all of that, can you modify it to get a LIFO queue? It doesn't seem easy. Pipes are inherently FIFO objects, and while the Queue can't guarantee FIFO behavior overall (due to the asynchronous nature of the writes from multiple processes) it is always going to be mostly FIFO.

If you have only a single consumer, you could get objects from the queue and add them to your own process-local stack. It would be harder to do a multi-consumer stack, though with shared memory a bounded-size stack wouldn't be too hard. You'd need a lock, a pair of conditions (for blocking/signaling on full and empty states), a shared integer value (for the number of values held) and a shared array of an appropriate type (for the values themselves).

like image 91
Blckknght Avatar answered Sep 17 '22 14:09

Blckknght


There is a LIFO queue in the Queue package (queue in Python 3). This isn't exposed in the multiprocessing or multiprocessing.queues modules.

Replacing your line q = mp.Queue() with q = Queue.LifoQueue() and running prints: min, max and average as exactly -1.

(Also I think you should always get exactly FIFO/LIFO order when getting items from only one thread.)

like image 34
Walton Avatar answered Sep 20 '22 14:09

Walton