I have a custom simulator (for biology) running on a 64-bit Linux (kernel version 2.6.28.4) machine using a 64-bit Python 3.3.0 CPython interpreter.
Because the simulator depends on many independent experiments for valid results,
I built in parallel processing for running experiments. Communication between
the threads primarily occurs under a producer-consumer pattern with managed
multiprocessing Queue
s
(doc).
The rundown of the architecture is as follows:
Process
es and the various Queue
sThe master process and the worker processes communicate via an input Queue
.
Similarly, the worker processes place their results in an output Queue
which
the result consumer process consumes items from. The final ResultConsumer
object is passed via a multiprocessing Pipe
(doc)
back to the master process.
Everything works fine until it tries to pass the ResultConsumer object back to
the master process via the Pipe
:
Traceback (most recent call last):
File "/home/cmccorma/.local/lib/python3.3/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/cmccorma/.local/lib/python3.3/multiprocessing/process.py", line 95, in run
self._target(*self._args, **self._kwargs)
File "DomainArchitectureGenerator.py", line 93, in ResultsConsumerHandler
pipeConn.send(resCon)
File "/home/cmccorma/.local/lib/python3.3/multiprocessing/connection.py", line 207, in send
self._send_bytes(buf.getbuffer())
File "/home/cmccorma/.local/lib/python3.3/multiprocessing/connection.py", line 394, in _send_bytes
self._send(struct.pack("!i", n))
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
I understand the first two traces (unhandled exits in the Process
library),
and the third is my line of code for sending the ResultConsumer object down the
Pipe
to the master process. The last two traces are where it gets
interesting. A Pipe
pickles any object that is sent to it and passes the
resulting bytes to the other end (matching connection) where it is unpickled
upon running recv()
. self._send_bytes(buf.getbuffer())
is attempting to
send the bytes of the pickled object. self._send(struct.pack("!i", n))
is
attempting to pack a struct with an integer (network/big-endian) of length n,
where n is the length of the buffer passed in as a parameter (the struct
library handles conversions between Python values and C structs represented as
Python strings, see the doc).
This error only occurs when attempting a lot of experiments, e.g. 10 experiments
will not cause it, but 1000 will consitently (all other parameters being constant). My best
hypothesis so far as to why struct.error
is thrown is that the number of bytes
trying to be pushed down the pipe exceeds 2^32-1 (2147483647), or ~2 GB.
So my question is two-fold:
I'm getting stuck with my investigations as struct.py
essentially just
imports from _struct
and I have no idea where that is.
The byte limit seems arbitrary given that the underlying architecture is all 64-bit. So, why can't I pass anything larger than that? Additionally, if I can't change this, are there any good (read: easy) workarounds to this issue?
Note: I don't think that using a Queue
in place of a Pipe
will solve the issue,
as I suspect that Queue
's use a similar pickling intermediate step. EDIT: This note is entirely incorrect as pointed out in abarnert's answer.
In multiprocessing, a pipe is a connection between two processes in Python. It is used to send data from one process which is received by another process. Under the covers, a pipe is implemented using a pair of connection objects, provided by the multiprocessing. connection. Connection class.
Shared memory : multiprocessing module provides Array and Value objects to share data between processes. Array: a ctypes array allocated from shared memory. Value: a ctypes object allocated from shared memory.
The multiprocessing package supports spawning processes. It refers to a function that loads and executes a new child processes. For the child to terminate or to continue executing concurrent computing,then the current process hasto wait using an API, which is similar to threading module.
I'm getting stuck with my investigations as struct.py essentially just imports from _struct and I have no idea where that is.
In CPython, _struct
is a C extension module built from _struct.c
in the Modules
directory in the source tree. You can find the code online here.
Whenever foo.py
does an import _foo
, that's almost always a C extension module, usually built from _foo.c
. And if you can't find a foo.py
at all, it's probably a C extension module, built from _foomodule.c
.
It's also often worth looking at the equivalent PyPy source, even if you're not using PyPy. They reimplement almost all extension modules in pure Python—and for the remainder (including this case), the underlying "extension language" is RPython, not C.
However, in this case, you don't need to know anything about how struct
is working beyond what's in the docs.
The byte limit seems arbitrary given that the underlying architecture is all 64-bit.
Look at the code it's calling:
self._send(struct.pack("!i", n))
If you look at the documentation, the 'i'
format character explicitly means "4-byte C integer", not "whatever ssize_t
is". For that, you'd have to use 'n'
. Or you might want to explicitly use a long long, with 'q'
.
You can monkeypatch multiprocessing
to use struct.pack('!q', n)
. Or '!q'
. Or encode the length in some way other than struct
. This will, of course, break compatibility with non-patched multiprocessing
, which could be a problem if you're trying to do distributed processing across multiple computers or something. But it should be pretty simple:
def _send_bytes(self, buf):
# For wire compatibility with 3.2 and lower
n = len(buf)
self._send(struct.pack("!q", n)) # was !i
# The condition is necessary to avoid "broken pipe" errors
# when sending a 0-length buffer if the other end closed the pipe.
if n > 0:
self._send(buf)
def _recv_bytes(self, maxsize=None):
buf = self._recv(8) # was 4
size, = struct.unpack("!q", buf.getvalue()) # was !i
if maxsize is not None and size > maxsize:
return None
return self._recv(size)
Of course there's no guarantee that this change is sufficient; you'll want to read through the rest of the surrounding code and test the hell out of it.
Note: I suspect that using a
Queue
in place of aPipe
will not solve the issue, as I suspect thatQueue
's use a similar pickling intermediate step.
Well, the problem has nothing to do with pickling. Pipe
isn't using pickle
to send the length, it's using struct
. You can verify that pickle
wouldn't have this problem: pickle.loads(pickle.dumps(1<<100)) == 1<<100
will return True
.
(In earlier versions, pickle
also had problems with huge objects—e.g., a list
of 2G elements—which could have caused problems at a scale about 8x as high as the one you're currently hitting. But that's been fixed by 3.3.)
Meanwhile… wouldn't it be faster to just try it and see, instead of digging through the source to try to figure out whether it would work?
Also, are you sure you really want to pass around a 2GB data structure by implicit pickling?
If I were doing something that slow and memory-hungry, I'd prefer to make that explicit—e.g., pickle to a tempfile and send the path or fd. (If you're using numpy
or pandas
or something, use its binary file format instead of pickle
, but same idea.)
Or, even better, share the data. Yes, mutable shared state is bad… but sharing immutable objects is fine. Whatever you've got 2GB of, can you put it in a multiprocessing.Array
, or put it in a ctypes
array or struct (of arrays or structs of …) that you can share via multiprocessing.sharedctypes
, or ctypes
it out of a file
that you mmap
on both sides, or…? There's a bit of extra code to define and pick apart the structures, but when the benefits are likely to be this big, it's worth trying.
Finally, when you think you've found a bug/obvious missing feature/unreasonable limitation in Python, it's worth looking at the bug tracker. It looks like issue 17560: problem using multiprocessing with really big objects? is exactly your problem, and has lots of information, including suggested workarounds.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With