Does anyone know a clean way to get near-LIFO or even not near-FIFO (e.g. random) behavior from multiprocessing.Queue
?
Alternative Question: Could someone point me to the code for the thread that manages the actual storage structure behind multiprocessing.Queue
? It seems like it would be trivial within that to provide approximately LIFO access, but I got lost in the rabbit hole trying to find it.
Notes:
multiprocessing.Queue
does not guarantee order. Fine. But it is near-FIFO so near-LIFO would be great.(edit) To clarify: I am doing a CPU bound simulation with multiprocessing
and so can't use the specialized queues from Queue
. Since I haven't seen any answers for a few days, I've added the alternative question above.
In case it is an issue, below is slight evidence that multiprocessing.Queue
is near-FIFO. It just shows that in a simple case (a single thread), it is perfectly FIFO on my system:
import multiprocessing as mp
import Queue
q = mp.Queue()
for i in xrange(1000):
q.put(i)
deltas = []
while True:
try:
value1 = q.get(timeout=0.1)
value2 = q.get(timeout=0.1)
deltas.append(value2-value1)
except Queue.Empty:
break
#positive deltas would indicate the numbers are coming out in increasing order
min_delta, max_delta = min(deltas), max(deltas)
avg_delta = sum(deltas)/len(deltas)
print "min", min_delta
print "max", max_delta
print "avg", avg_delta
prints: min, max, and average are exactly 1 (perfect FIFO)
I've looked over the Queue class that lives in Lib/multiprocessing/queues.py
in my Python installation (Python 2.7, but nothing obvious is different in the version from Python 3.2 that I briefly checked). Here's how I understand it works:
There are two sets of objects that are maintained by the Queue object. One set are multiprocess-safe primatives that are shared by all processes. The others are created and used separately by each process.
The cross-process objects are set up in the __init__
method:
Pipe
object, who's two ends are saved as self._reader
and self._writer
.BoundedSemaphore
object, which counts (and optionally limits) how many objects are in the queue.Lock
object for reading the Pipe, and on non-Windows platforms another for writing. (I assume that this is because writing to a pipe is inherently multiprocess-safe on Windows.)The per-process objects are set up in the _after_fork
and _start_thread
methods:
collections.deque
object used to buffer writes to the Pipe.threading.condition
object used to signal when the buffer is not empty.threading.Thread
object that does the actual writing. It is created lazily, so it won't exist until at least one write to the Queue has been requested in a given process.Finalize
objects that clean stuff up when the process ends.A get
from the queue is pretty simple. You acquire the read lock, decrement the semaphore, and grab an object from the read end of the Pipe.
A put
is more complicated. It uses multiple threads. The caller to put
grabs the condition's lock, then adds its object to the buffer and signals the condition before unlocking it. It also increments the semaphore and starts up the writer thread if it isn't running yet.
The writer thread loops forever (until canceled) in the _feed
method. If the buffer is empty, it waits on the notempty
condition. Then it takes an item from the buffer, acquires the write lock (if it exists) and writes the item to the Pipe.
So, given all of that, can you modify it to get a LIFO queue? It doesn't seem easy. Pipes are inherently FIFO objects, and while the Queue can't guarantee FIFO behavior overall (due to the asynchronous nature of the writes from multiple processes) it is always going to be mostly FIFO.
If you have only a single consumer, you could get
objects from the queue and add them to your own process-local stack. It would be harder to do a multi-consumer stack, though with shared memory a bounded-size stack wouldn't be too hard. You'd need a lock, a pair of conditions (for blocking/signaling on full and empty states), a shared integer value (for the number of values held) and a shared array of an appropriate type (for the values themselves).
There is a LIFO queue in the Queue package (queue in Python 3). This isn't exposed in the multiprocessing or multiprocessing.queues modules.
Replacing your line q = mp.Queue()
with q = Queue.LifoQueue()
and running prints: min, max and average as exactly -1.
(Also I think you should always get exactly FIFO/LIFO order when getting items from only one thread.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With