Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What could make a connection.send() block ? (from conn1, conn2 = multiprocessing.Pipe() )

I am debugging application that gather information from 2 sensors : a webcam and a microphone.

The general architecture is quite simple :

  • the main process sends messages (start, stop, get_data) via pipes to the child processes (one for each).
  • child processes gather the data and send it to the main process

Child & main processes are in infinite loops to process commands (the main process from the user, the child process from the main process).

It globally works but I have trouble stopping the child processes.

I have logged the code and it seems to happen 2 things :

  1. The 'stop' message is sent but doesn't get through the pipe.
  2. The child process continue to send data and the conn.send(data) blocks.

The behavior is clearly linked to the state of the connection, as child processes that send nothing back don't have this behavior. Still, I don't see how to debug/modify the current architecture which seems reasonnable.

So, what cause this blocking behavior and how to avoid it ?

This is the code which is executed for each iteration of the infinite loop in the child process :

    def do(self):
        while self.cnx.poll():
            msg = self.cnx.recv()
            self.queue.append(msg)
        #==
        if not self.queue:
            func_name = 'default_action'
            self.queue.append([func_name, ])
        #==
        msg             = self.queue.pop()
        func_name, args = msg[0], msg[1:]
        #==
        res = self.target.__getattribute__(func_name)(*args)
        #==
        running = func_name != 'stop'
        #==
        if res and self.send:
            assert running
            self.output_queue.append(res[0])
        if self.output_queue and running:
            self.cnx.send(self.output_queue.popleft())
        #==
        return running

update : it seems that the Pipe cannot be written simultaneously on both end. It works if change the last few lines of the above code to :

        if self.output_queue and running:
            if not self.cnx.poll(): 
                self.cnx.send(self.output_queue.popleft())

The question stays open though as Pipe are documented as full duplex by default and this behavior is not documented at all. I must have misunderstood something. Please, enlight me!

update 2 : just to be clear, no connection is closed during in this situation. To describe the sequence of events :

  • the main process sends a messsage ("stop") (it empties the connection before sending the message)
  • the main process enter an (infinite) loop that stops when the child process is terminated.
  • meanwhile, the child process is blocked in the send and never gets the message.
like image 893
LBarret Avatar asked Sep 02 '12 14:09

LBarret


1 Answers

A full duplex multiprocessing.Pipe is implemented as socketpair(). Calling .send can block for all the normal reasons when talking to a socket. Based on your description I think it's likely that the reader of your Pipe has quit reading and data has built up in the buffers in the kernel to the point where your .send blocks.

If you explicitly .close the receiving side you'll probably get some kind of error (although possibly SIGPIPE as well, not sure) when you try to .send. If your receiving connection was going out of scope this would probably happen automatically. You may be able to fix the problem by just being more careful not to store references (direct or indirect) to the receiving side so it gets deallocated when that thread goes away.

Trivial demo of blocking .send:

import multiprocessing

a, b = multiprocessing.Pipe()

while True:
    print "send!"
    a.send("hello world")

Now note that after a while it quits printing "send!"

like image 187
Ben Jackson Avatar answered Nov 05 '22 05:11

Ben Jackson