I am trying to write a class that will calculate checksums using multiple processes, thereby taking advantage of multiple cores. I have a quite simple class for this, and it works great when executing a simple case. But whenever I create two or more instances of the class, the worker never exits. It seems like it never get the message that the pipe has been closed by the parent.
All the code can be found below. I first calculate the md5 and sha1 checksums separately, which works, and then I try to perform the calculation in parallel, and then the program locks up when it is time to close the pipe.
What is going on here? Why aren't the pipes working as I expect? I guess I could do a workaround by sending a "Stop" message on the queue and make the child quit that way, but I'd really like to know why this isn't working as it is.
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
self.parent_conn.close() # This is the child. Close unused end.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums in parallel causes a lockup!
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!
main()
PS. This problem has been solved Here is a working version of the above code if anyone is interested:
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
all_open_parent_conns = []
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
for conn in ChecksumPipe.all_open_parent_conns:
conn.close() # This is the child. Close unused ends.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums also works fine now
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()
main()
In this example, at first we import the Process class then initiate Process object with the display() function. Then process is started with start() method and then complete the process with the join() method. We can also pass arguments to the function using args keyword.
Pipe is a Python library that enables you to use pipes in Python. A pipe ( | ) passes the results of one method to another method. I like Pipe because it makes my code look cleaner when applying multiple methods to a Python iterable. Since Pipe only provides a few methods, it is also very easy to learn Pipe.
If we are using the context manager to create the process pool so that it is automatically shutdown, then you can configure the number of processes in the same manner. The number of workers must be less than or equal to 61 if Windows is your operating system.
If your program is IO-bound, both multithreading and multiprocessing in Python will work smoothly. However, If the code is CPU-bound and your machine has multiple cores, multiprocessing would be a better choice.
Yep, that is surprising behaviour indeed.
However, if you look at the output of lsof
for the two parallel child processes it is easy to notice that the second child process has more file descriptors open.
What happens is that when two parallel child processes get started the second child inherits the pipes of the parent, so that when the parent calls self.parent_conn.close()
the second child still has that pipe file descriptor open, so that the pipe file description doesn't get closed in the kernel (the reference count is more than 0), with the effect being that self.child_conn.recv_bytes()
in the first parallel child process never read()
s EOF
and EOFError
gets never thrown.
You may need to send an explicit shutdown message, rather then just closing file descriptors because there seem to be little control over what file descriptors get shared between which processes (there is no close-on-fork file descriptor flag).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With