Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementation of multiprocessing.Queue and queue.Queue

I am looking for more insights on the Queues implementations in Python than I can find in the documentation.

From what I understood, and excuse my ignorance if I am wrong on this:

queue.Queue(): is implemented through basic arrays in-memory and so cannot be shared between multiple processes but can be shared between threads. So far, so good.

multiprocessing.Queue(): is implemented through pipes (man 2 pipes) which have size limit (rather tiny: on Linux, man 7 pipe says 65536 untweaked):

Since Linux 2.6.35, the default pipe capacity is 65536 bytes, but the capacity can be queried and set using the fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations

But, in Python, whenever I try to write data larger than 65536 bytes into the pipe, it works without exception - I could flood my memory this way:

import multiprocessing
from time import sleep

def big():
    result = ""
    for i in range(1,70000):
        result += ","+str(i)
    return result # 408888 bytes string

def writequeue(q):
    while True:
        q.put(big())
        sleep(0.1)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=writequeue, args=(q,))
    p.start()
    while True:
        sleep(1) # No pipe consumption, we just want to flood the pipe

So here are my questions:

  • does Python tweak the pipe limit? if yes, by how much ? Python source code is welcomed.

  • Are Python piped communications inter-operable with other non-Python processes? If yes, working examples (JS preferably) and resource links are welcomed.

like image 949
Fabien Avatar asked Jul 17 '17 15:07

Fabien


People also ask

What is a queue in multiprocessing?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.

How do I print a multiprocessing queue in Python?

We can print the contents of the multiprocessing using the get() method, empty() method and the print() function. We will check if the multiprocessing queue is empty or not using the empty() method. If the queue is not empty, we will extract an element from the queue using the get() method and print the result.

How does Python multiprocessing queue work?

Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.


1 Answers

Why is q.put() not blocking??

mutiprocessing.Queue creates a pipe which blocks if the pipe is already full. Of course writing more than the pipe capacity will cause the write call to block until the reading end has cleared enough data. Ok, so if the pipe blocks when its capacity is reached, why is q.put() not also blocking once the pipe is full? Even the first call to q.put() in the example should fill up the pipe, and everything should block there, no?

No, it does not block, because the multiprocessing.Queue implementation decouples the .put() method from writes to the pipe. The .put() method enqueues the data passed to it in an internal buffer, and there is a separate thread which is charged with reading from this buffer and writing to the pipe. This thread will block when the pipe is full, but it will not prevent .put() from enqueuing more data into the internal buffer.

The implementation of .put() saves the data to self._buffer and note how it kicks off a thread if there is not one already running:

def put(self, obj, block=True, timeout=None):
    assert not self._closed
    if not self._sem.acquire(block, timeout):
        raise Full

    with self._notempty:
        if self._thread is None:
            self._start_thread()
        self._buffer.append(obj)
        self._notempty.notify()

The ._feed() method is what reads from self._buffer and feeds the data to the pipe. And ._start_thread() is what sets up a thread that runs ._feed().

How can I limit queue size?

If you want to limit how much data can be written into a queue, I don't see a way to do it by specifying a number of bytes but you can limit the number of items that are stored in the internal buffer at any one time by passing a number to multiprocessing.Queue:

q = multiprocessing.Queue(2)

When I use the parameter above, and use your code, q.put() will enqueue two items, and will block on the third attempt.

Are Python piped communications inter-operable with other non-Python processes?

It depends. The facilities provided by the multiprocessing module are not easily interoperable with other languages. I expect it would be possible to make multiprocessing interoperate with other languages, but achieving this goal would be a major enterprise. The module is written with the expectation that the processes involved are running Python code.

If you look at more general methods, then the answer is yes. You could use a socket as a communication pipe between two different processes. For instance, a JavaScript process that reads from a named socket:

var net = require("net");
var fs = require("fs");

sockPath = "/tmp/test.sock"
try {
    fs.unlinkSync(sockPath);
}
catch (ex) {
    // Don't care if the path does not exist, but rethrow if we get
    // another error.
    if (ex.code !== "ENOENT") {
        throw ex;
    }
}

var server = net.createServer(function(stream) {
  stream.on("data", function(c) {
    console.log("received:", c.toString());
  });

  stream.on("end", function() {
    server.close();
  });
});

server.listen(sockPath);

And a Python process that writes to it:

import socket
import time

sockfile = "/tmp/test.sock"

conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.connect(sockfile)

count = 0
while True:
    count += 1
    conn.sendall(bytes(str(count), "utf-8"))
    time.sleep(1)

If you want to try the above, you need to start the JavaScript side first so that the Python side has something to write to. This is a proof-of-concept. A complete solution would need more polish.

In order to pass complex structures from Python to other languages, you'll have to find a way to serialize your data in a format that can be read on both sides. Pickles are unfortunately Python-specific. I generally pick JSON whenever I need to serialize between languages, or use an ad-hoc format if JSON won't do it.

like image 103
Louis Avatar answered Oct 05 '22 15:10

Louis