Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Byte limit when transferring Python objects between Processes using a Pipe?

I have a custom simulator (for biology) running on a 64-bit Linux (kernel version 2.6.28.4) machine using a 64-bit Python 3.3.0 CPython interpreter.

Because the simulator depends on many independent experiments for valid results, I built in parallel processing for running experiments. Communication between the threads primarily occurs under a producer-consumer pattern with managed multiprocessing Queues (doc). The rundown of the architecture is as follows:

  • a master processes that handles spawning and managing Processes and the various Queues
  • N worker processes that do simulations
  • 1 result consumer process that consumes the results of a simulation and sorts and analyzes the results

The master process and the worker processes communicate via an input Queue. Similarly, the worker processes place their results in an output Queue which the result consumer process consumes items from. The final ResultConsumer object is passed via a multiprocessing Pipe (doc) back to the master process.

Everything works fine until it tries to pass the ResultConsumer object back to the master process via the Pipe:

Traceback (most recent call last):
  File "/home/cmccorma/.local/lib/python3.3/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/cmccorma/.local/lib/python3.3/multiprocessing/process.py", line 95, in run
    self._target(*self._args, **self._kwargs)
  File "DomainArchitectureGenerator.py", line 93, in ResultsConsumerHandler
    pipeConn.send(resCon)
  File "/home/cmccorma/.local/lib/python3.3/multiprocessing/connection.py", line 207, in send
    self._send_bytes(buf.getbuffer())
  File "/home/cmccorma/.local/lib/python3.3/multiprocessing/connection.py", line 394, in _send_bytes
    self._send(struct.pack("!i", n))
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

I understand the first two traces (unhandled exits in the Process library), and the third is my line of code for sending the ResultConsumer object down the Pipe to the master process. The last two traces are where it gets interesting. A Pipe pickles any object that is sent to it and passes the resulting bytes to the other end (matching connection) where it is unpickled upon running recv(). self._send_bytes(buf.getbuffer()) is attempting to send the bytes of the pickled object. self._send(struct.pack("!i", n)) is attempting to pack a struct with an integer (network/big-endian) of length n, where n is the length of the buffer passed in as a parameter (the struct library handles conversions between Python values and C structs represented as Python strings, see the doc).

This error only occurs when attempting a lot of experiments, e.g. 10 experiments will not cause it, but 1000 will consitently (all other parameters being constant). My best hypothesis so far as to why struct.error is thrown is that the number of bytes trying to be pushed down the pipe exceeds 2^32-1 (2147483647), or ~2 GB.

So my question is two-fold:

  1. I'm getting stuck with my investigations as struct.py essentially just imports from _struct and I have no idea where that is.

  2. The byte limit seems arbitrary given that the underlying architecture is all 64-bit. So, why can't I pass anything larger than that? Additionally, if I can't change this, are there any good (read: easy) workarounds to this issue?

Note: I don't think that using a Queue in place of a Pipe will solve the issue, as I suspect that Queue's use a similar pickling intermediate step. EDIT: This note is entirely incorrect as pointed out in abarnert's answer.

like image 417
Collin M Avatar asked May 15 '13 23:05

Collin M


People also ask

What is pipe in multiprocessing?

In multiprocessing, a pipe is a connection between two processes in Python. It is used to send data from one process which is received by another process. Under the covers, a pipe is implemented using a pair of connection objects, provided by the multiprocessing. connection. Connection class.

Does multiprocessing shared memory?

Shared memory : multiprocessing module provides Array and Value objects to share data between processes. Array: a ctypes array allocated from shared memory. Value: a ctypes object allocated from shared memory.

How does Python multiprocessing work?

The multiprocessing package supports spawning processes. It refers to a function that loads and executes a new child processes. For the child to terminate or to continue executing concurrent computing,then the current process hasto wait using an API, which is similar to threading module.


1 Answers

I'm getting stuck with my investigations as struct.py essentially just imports from _struct and I have no idea where that is.

In CPython, _struct is a C extension module built from _struct.c in the Modules directory in the source tree. You can find the code online here.

Whenever foo.py does an import _foo, that's almost always a C extension module, usually built from _foo.c. And if you can't find a foo.py at all, it's probably a C extension module, built from _foomodule.c.

It's also often worth looking at the equivalent PyPy source, even if you're not using PyPy. They reimplement almost all extension modules in pure Python—and for the remainder (including this case), the underlying "extension language" is RPython, not C.

However, in this case, you don't need to know anything about how struct is working beyond what's in the docs.


The byte limit seems arbitrary given that the underlying architecture is all 64-bit.

Look at the code it's calling:

self._send(struct.pack("!i", n))

If you look at the documentation, the 'i' format character explicitly means "4-byte C integer", not "whatever ssize_t is". For that, you'd have to use 'n'. Or you might want to explicitly use a long long, with 'q'.

You can monkeypatch multiprocessing to use struct.pack('!q', n). Or '!q'. Or encode the length in some way other than struct. This will, of course, break compatibility with non-patched multiprocessing, which could be a problem if you're trying to do distributed processing across multiple computers or something. But it should be pretty simple:

def _send_bytes(self, buf):
    # For wire compatibility with 3.2 and lower
    n = len(buf)
    self._send(struct.pack("!q", n)) # was !i
    # The condition is necessary to avoid "broken pipe" errors
    # when sending a 0-length buffer if the other end closed the pipe.
    if n > 0:
        self._send(buf)

def _recv_bytes(self, maxsize=None):
    buf = self._recv(8) # was 4
    size, = struct.unpack("!q", buf.getvalue()) # was !i
    if maxsize is not None and size > maxsize:
        return None
    return self._recv(size)

Of course there's no guarantee that this change is sufficient; you'll want to read through the rest of the surrounding code and test the hell out of it.


Note: I suspect that using a Queue in place of a Pipe will not solve the issue, as I suspect that Queue's use a similar pickling intermediate step.

Well, the problem has nothing to do with pickling. Pipe isn't using pickle to send the length, it's using struct. You can verify that pickle wouldn't have this problem: pickle.loads(pickle.dumps(1<<100)) == 1<<100 will return True.

(In earlier versions, pickle also had problems with huge objects—e.g., a list of 2G elements—which could have caused problems at a scale about 8x as high as the one you're currently hitting. But that's been fixed by 3.3.)

Meanwhile… wouldn't it be faster to just try it and see, instead of digging through the source to try to figure out whether it would work?


Also, are you sure you really want to pass around a 2GB data structure by implicit pickling?

If I were doing something that slow and memory-hungry, I'd prefer to make that explicit—e.g., pickle to a tempfile and send the path or fd. (If you're using numpy or pandas or something, use its binary file format instead of pickle, but same idea.)

Or, even better, share the data. Yes, mutable shared state is bad… but sharing immutable objects is fine. Whatever you've got 2GB of, can you put it in a multiprocessing.Array, or put it in a ctypes array or struct (of arrays or structs of …) that you can share via multiprocessing.sharedctypes, or ctypes it out of a file that you mmap on both sides, or…? There's a bit of extra code to define and pick apart the structures, but when the benefits are likely to be this big, it's worth trying.


Finally, when you think you've found a bug/obvious missing feature/unreasonable limitation in Python, it's worth looking at the bug tracker. It looks like issue 17560: problem using multiprocessing with really big objects? is exactly your problem, and has lots of information, including suggested workarounds.

like image 148
abarnert Avatar answered Oct 19 '22 19:10

abarnert