Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Synchronous/Asynchronous behaviour of python Pipes

In my application I'm using pipes from the multiprocessing module to communicate between python processes. Lately I've observed a weird behaviour depending on the size of data I'm sending through them. According to the python documentation these pipes are based on the connections and should behave in an asynchronous manner, yet sometimes they get stuck at sending. If I enable full duplex in each connection everything works fine, even though I'm not using the connections for both sending and listening. Can anyone explain this behaviour?

  1. 100 floats, full duplex disable
    The code works, utilizing the asynchronousness.
  2. 100 floats, full duplex enable
    The example works fine as expected.
  3. 10000 floats, full duplex disable
    The execution is blocked forever even though it was fine with the smaller data.
  4. 10000 floats, full duplex enable
    Fine again.

Code (it's not my production code, it just illustrates what I mean):

from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid

PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000


def arg_passer(pipe_in, pipe_out, list_):
    my_pid = getpid()
    print "{}: Before send".format(my_pid)
    pipe_out.send(list_)
    print "{}: After send, before recv".format(my_pid)
    buf = pipe_in.recv()
    print "{}: After recv".format(my_pid)


if __name__ == "__main__":
    pipes = [Pipe(False) for _ in range(PROC_NR)]
    # pipes = [Pipe(True) for _ in range(PROC_NR)]
    pipes_in = deque(p[0] for p in pipes)
    pipes_out = deque(p[1] for p in pipes)
    pipes_in.rotate(1)
    pipes_out.rotate(-1)

    data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
    processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
                 for foo in xrange(PROC_NR)]

    for proc in processes:
        proc.start()

    for proc in processes:
        proc.join()
like image 708
Michal Avatar asked Apr 21 '13 14:04

Michal


People also ask

Are pipes synchronous or asynchronous?

Synchronous pipes are not supported in conjunction with asynchronous functions. Unlike conventional (synchronous) pipes where the server handles all the details of sending and receiving pipe data, asynchronous pipes are symmetrical. That is, both the client and the server can push and pull data through the pipe.

What is synchronous and asynchronous in Python?

Sync is single-thread, so only one operation or program will run at a time. Async is non-blocking, which means it will send multiple requests to a server. Sync is blocking — it will only send the server one request at a time and will wait for that request to be answered by the server.

How does pipe work in Python?

pipe() method in Python is used to create a pipe. A pipe is a method to pass information from one process to another process. It offers only one-way communication and the passed information is held by the system until it is read by the receiving process.


1 Answers

First of all, it's worth noting the implementation of the multiprocessing.Pipe class...

def Pipe(duplex=True):
    '''
    Returns pair of connection objects at either end of a pipe
    '''
    if duplex:
        s1, s2 = socket.socketpair()
        s1.setblocking(True)
        s2.setblocking(True)
        c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
        c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
        s1.close()
        s2.close()
    else:
        fd1, fd2 = os.pipe()
        c1 = _multiprocessing.Connection(fd1, writable=False)
        c2 = _multiprocessing.Connection(fd2, readable=False)

    return c1, c2

The difference being that half-duplex 'Pipes' use an anonymous pipe, but full-duplex 'Pipes' actually use a Unix domain socket, since anonymous pipes are unidirectional by nature.

I'm not sure what you mean by the term "asynchronous" in this context. If you mean "non-blocking I/O" then it's worth noting that both implementations use blocking I/O by default.


Secondly, it's worth noting the pickled size of the data you're trying to send...

>>> from numpy.random import randn
>>> from cPickle import dumps
>>> len(dumps(randn(100)))
2479
>>> len(dumps(randn(10000)))
237154

Thirdly, from the pipe(7) manpage...

Pipe Capacity

A pipe has a limited capacity. If the pipe is full, then a write(2) will block or fail, depending on whether the O_NONBLOCK flag is set (see below). Different implementations have different limits for the pipe capacity. Applications should not rely on a particular capacity: an application should be designed so that a reading process consumes data as soon as it is available, so that a writing process does not remain blocked.

In Linux versions before 2.6.11, the capacity of a pipe was the same as the system page size (e.g., 4096 bytes on i386). Since Linux 2.6.11, the pipe capacity is 65536 bytes.


So, in effect, you've created a deadlock where all the subprocesses have blocked on the pipe_out.send() call, and none of them can receive any data from the other processes, because you're sending all 237,154 bytes of data in one hit, which has filled the 65,536 byte buffer.

You might be tempted just to use the Unix domain socket version, but the only reason it works at present is that it has a larger buffer size than a pipe, and you'll find that solution will also fail if you increase the number of DATA_POINTS to 100,000.

The "quick n' dirty hack" solution is to break the data into smaller chunks for sending, but it's not good practice to rely on the buffers being a specific size.

A better solution would be to use non-blocking I/O on the pipe_out.send() call, although I'm not familiar enough with the multiprocessing module to determine the best way to achieve it using that module.

The pseudocode would be along the lines of...

while 1:
    if we have sent all data and received all data:
        break
    send as much data as we can without blocking
    receive as much data as we can without blocking
    if we didn't send or receive anything in this iteration:
        sleep for a bit so we don't waste CPU time
        continue

...or you can use the Python select module to avoid sleeping for longer than is necessary, but, again, integrating it with multiprocessing.Pipe might be tricky.

It's possible that the multiprocessing.Queue class does all this for you, but I've never used it before, so you'd have to do some experiments.

like image 60
Aya Avatar answered Nov 16 '22 03:11

Aya