In my application I'm using pipes from the multiprocessing module to communicate between python processes. Lately I've observed a weird behaviour depending on the size of data I'm sending through them. According to the python documentation these pipes are based on the connections and should behave in an asynchronous manner, yet sometimes they get stuck at sending. If I enable full duplex in each connection everything works fine, even though I'm not using the connections for both sending and listening. Can anyone explain this behaviour?
Code (it's not my production code, it just illustrates what I mean):
from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid
PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000
def arg_passer(pipe_in, pipe_out, list_):
my_pid = getpid()
print "{}: Before send".format(my_pid)
pipe_out.send(list_)
print "{}: After send, before recv".format(my_pid)
buf = pipe_in.recv()
print "{}: After recv".format(my_pid)
if __name__ == "__main__":
pipes = [Pipe(False) for _ in range(PROC_NR)]
# pipes = [Pipe(True) for _ in range(PROC_NR)]
pipes_in = deque(p[0] for p in pipes)
pipes_out = deque(p[1] for p in pipes)
pipes_in.rotate(1)
pipes_out.rotate(-1)
data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
for foo in xrange(PROC_NR)]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
Synchronous pipes are not supported in conjunction with asynchronous functions. Unlike conventional (synchronous) pipes where the server handles all the details of sending and receiving pipe data, asynchronous pipes are symmetrical. That is, both the client and the server can push and pull data through the pipe.
Sync is single-thread, so only one operation or program will run at a time. Async is non-blocking, which means it will send multiple requests to a server. Sync is blocking — it will only send the server one request at a time and will wait for that request to be answered by the server.
pipe() method in Python is used to create a pipe. A pipe is a method to pass information from one process to another process. It offers only one-way communication and the passed information is held by the system until it is read by the receiving process.
First of all, it's worth noting the implementation of the multiprocessing.Pipe
class...
def Pipe(duplex=True):
'''
Returns pair of connection objects at either end of a pipe
'''
if duplex:
s1, s2 = socket.socketpair()
s1.setblocking(True)
s2.setblocking(True)
c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
s1.close()
s2.close()
else:
fd1, fd2 = os.pipe()
c1 = _multiprocessing.Connection(fd1, writable=False)
c2 = _multiprocessing.Connection(fd2, readable=False)
return c1, c2
The difference being that half-duplex 'Pipes' use an anonymous pipe, but full-duplex 'Pipes' actually use a Unix domain socket, since anonymous pipes are unidirectional by nature.
I'm not sure what you mean by the term "asynchronous" in this context. If you mean "non-blocking I/O" then it's worth noting that both implementations use blocking I/O by default.
Secondly, it's worth noting the pickled size of the data you're trying to send...
>>> from numpy.random import randn
>>> from cPickle import dumps
>>> len(dumps(randn(100)))
2479
>>> len(dumps(randn(10000)))
237154
Thirdly, from the pipe(7)
manpage...
Pipe Capacity
A pipe has a limited capacity. If the pipe is full, then a write(2) will block or fail, depending on whether the O_NONBLOCK flag is set (see below). Different implementations have different limits for the pipe capacity. Applications should not rely on a particular capacity: an application should be designed so that a reading process consumes data as soon as it is available, so that a writing process does not remain blocked.
In Linux versions before 2.6.11, the capacity of a pipe was the same as the system page size (e.g., 4096 bytes on i386). Since Linux 2.6.11, the pipe capacity is 65536 bytes.
So, in effect, you've created a deadlock where all the subprocesses have blocked on the pipe_out.send()
call, and none of them can receive any data from the other processes, because you're sending all 237,154 bytes of data in one hit, which has filled the 65,536 byte buffer.
You might be tempted just to use the Unix domain socket version, but the only reason it works at present is that it has a larger buffer size than a pipe, and you'll find that solution will also fail if you increase the number of DATA_POINTS
to 100,000.
The "quick n' dirty hack" solution is to break the data into smaller chunks for sending, but it's not good practice to rely on the buffers being a specific size.
A better solution would be to use non-blocking I/O on the pipe_out.send()
call, although I'm not familiar enough with the multiprocessing
module to determine the best way to achieve it using that module.
The pseudocode would be along the lines of...
while 1:
if we have sent all data and received all data:
break
send as much data as we can without blocking
receive as much data as we can without blocking
if we didn't send or receive anything in this iteration:
sleep for a bit so we don't waste CPU time
continue
...or you can use the Python select
module to avoid sleeping for longer than is necessary, but, again, integrating it with multiprocessing.Pipe
might be tricky.
It's possible that the multiprocessing.Queue
class does all this for you, but I've never used it before, so you'd have to do some experiments.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With