I am debugging application that gather information from 2 sensors : a webcam and a microphone.
The general architecture is quite simple :
Child & main processes are in infinite loops to process commands (the main process from the user, the child process from the main process).
It globally works but I have trouble stopping the child processes.
I have logged the code and it seems to happen 2 things :
The behavior is clearly linked to the state of the connection, as child processes that send nothing back don't have this behavior. Still, I don't see how to debug/modify the current architecture which seems reasonnable.
So, what cause this blocking behavior and how to avoid it ?
This is the code which is executed for each iteration of the infinite loop in the child process :
def do(self):
while self.cnx.poll():
msg = self.cnx.recv()
self.queue.append(msg)
#==
if not self.queue:
func_name = 'default_action'
self.queue.append([func_name, ])
#==
msg = self.queue.pop()
func_name, args = msg[0], msg[1:]
#==
res = self.target.__getattribute__(func_name)(*args)
#==
running = func_name != 'stop'
#==
if res and self.send:
assert running
self.output_queue.append(res[0])
if self.output_queue and running:
self.cnx.send(self.output_queue.popleft())
#==
return running
update : it seems that the Pipe cannot be written simultaneously on both end. It works if change the last few lines of the above code to :
if self.output_queue and running:
if not self.cnx.poll():
self.cnx.send(self.output_queue.popleft())
The question stays open though as Pipe are documented as full duplex by default and this behavior is not documented at all. I must have misunderstood something. Please, enlight me!
update 2 : just to be clear, no connection is closed during in this situation. To describe the sequence of events :
A full duplex multiprocessing.Pipe
is implemented as socketpair()
. Calling .send
can block for all the normal reasons when talking to a socket. Based on your description I think it's likely that the reader of your Pipe
has quit reading and data has built up in the buffers in the kernel to the point where your .send
blocks.
If you explicitly .close
the receiving side you'll probably get some kind of error (although possibly SIGPIPE
as well, not sure) when you try to .send
. If your receiving connection was going out of scope this would probably happen automatically. You may be able to fix the problem by just being more careful not to store references (direct or indirect) to the receiving side so it gets deallocated when that thread goes away.
Trivial demo of blocking .send
:
import multiprocessing
a, b = multiprocessing.Pipe()
while True:
print "send!"
a.send("hello world")
Now note that after a while it quits printing "send!"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With