I am looking for more insights on the Queues implementations in Python than I can find in the documentation.
From what I understood, and excuse my ignorance if I am wrong on this:
queue.Queue()
: is implemented through basic arrays in-memory and so cannot be shared between multiple processes but can be shared between threads. So far, so good.
multiprocessing.Queue()
: is implemented through pipes (man 2 pipes
) which have size limit (rather tiny: on Linux, man 7 pipe
says 65536 untweaked):
Since Linux 2.6.35, the default pipe capacity is 65536 bytes, but the capacity can be queried and set using the
fcntl(2)
F_GETPIPE_SZ
andF_SETPIPE_SZ
operations
But, in Python, whenever I try to write data larger than 65536 bytes into the pipe, it works without exception - I could flood my memory this way:
import multiprocessing
from time import sleep
def big():
result = ""
for i in range(1,70000):
result += ","+str(i)
return result # 408888 bytes string
def writequeue(q):
while True:
q.put(big())
sleep(0.1)
if __name__ == '__main__':
q = multiprocessing.Queue()
p = multiprocessing.Process(target=writequeue, args=(q,))
p.start()
while True:
sleep(1) # No pipe consumption, we just want to flood the pipe
So here are my questions:
does Python tweak the pipe limit? if yes, by how much ? Python source code is welcomed.
Are Python piped communications inter-operable with other non-Python processes? If yes, working examples (JS preferably) and resource links are welcomed.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.
We can print the contents of the multiprocessing using the get() method, empty() method and the print() function. We will check if the multiprocessing queue is empty or not using the empty() method. If the queue is not empty, we will extract an element from the queue using the get() method and print the result.
Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.
mutiprocessing.Queue
creates a pipe which blocks if the pipe is already full. Of course writing more than the pipe capacity will cause the write
call to block until the reading end has cleared enough data. Ok, so if the pipe blocks when its capacity is reached, why is q.put()
not also blocking once the pipe is full? Even the first call to q.put()
in the example should fill up the pipe, and everything should block there, no?
No, it does not block, because the multiprocessing.Queue
implementation decouples the .put()
method from writes to the pipe. The .put()
method enqueues the data passed to it in an internal buffer, and there is a separate thread which is charged with reading from this buffer and writing to the pipe. This thread will block when the pipe is full, but it will not prevent .put()
from enqueuing more data into the internal buffer.
The implementation of .put() saves the data to self._buffer
and note how it kicks off a thread if there is not one already running:
def put(self, obj, block=True, timeout=None):
assert not self._closed
if not self._sem.acquire(block, timeout):
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
The ._feed()
method is what reads from self._buffer
and feeds the data to the pipe. And ._start_thread()
is what sets up a thread that runs ._feed()
.
If you want to limit how much data can be written into a queue, I don't see a way to do it by specifying a number of bytes but you can limit the number of items that are stored in the internal buffer at any one time by passing a number to multiprocessing.Queue
:
q = multiprocessing.Queue(2)
When I use the parameter above, and use your code, q.put()
will enqueue two items, and will block on the third attempt.
It depends. The facilities provided by the multiprocessing
module are not easily interoperable with other languages. I expect it would be possible to make multiprocessing
interoperate with other languages, but achieving this goal would be a major enterprise. The module is written with the expectation that the processes involved are running Python code.
If you look at more general methods, then the answer is yes. You could use a socket as a communication pipe between two different processes. For instance, a JavaScript process that reads from a named socket:
var net = require("net");
var fs = require("fs");
sockPath = "/tmp/test.sock"
try {
fs.unlinkSync(sockPath);
}
catch (ex) {
// Don't care if the path does not exist, but rethrow if we get
// another error.
if (ex.code !== "ENOENT") {
throw ex;
}
}
var server = net.createServer(function(stream) {
stream.on("data", function(c) {
console.log("received:", c.toString());
});
stream.on("end", function() {
server.close();
});
});
server.listen(sockPath);
And a Python process that writes to it:
import socket
import time
sockfile = "/tmp/test.sock"
conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.connect(sockfile)
count = 0
while True:
count += 1
conn.sendall(bytes(str(count), "utf-8"))
time.sleep(1)
If you want to try the above, you need to start the JavaScript side first so that the Python side has something to write to. This is a proof-of-concept. A complete solution would need more polish.
In order to pass complex structures from Python to other languages, you'll have to find a way to serialize your data in a format that can be read on both sides. Pickles are unfortunately Python-specific. I generally pick JSON whenever I need to serialize between languages, or use an ad-hoc format if JSON won't do it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With