Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using python multiprocessing pipes

I am trying to write a class that will calculate checksums using multiple processes, thereby taking advantage of multiple cores. I have a quite simple class for this, and it works great when executing a simple case. But whenever I create two or more instances of the class, the worker never exits. It seems like it never get the message that the pipe has been closed by the parent.

All the code can be found below. I first calculate the md5 and sha1 checksums separately, which works, and then I try to perform the calculation in parallel, and then the program locks up when it is time to close the pipe.

What is going on here? Why aren't the pipes working as I expect? I guess I could do a workaround by sending a "Stop" message on the queue and make the child quit that way, but I'd really like to know why this isn't working as it is.

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):
    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        self.parent_conn.close() # This is the child. Close unused end.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()


def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums in parallel causes a lockup!
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!

main()

PS. This problem has been solved Here is a working version of the above code if anyone is interested:

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):

    all_open_parent_conns = []

    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        for conn in ChecksumPipe.all_open_parent_conns:
            conn.close() # This is the child. Close unused ends.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()

def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums also works fine now
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()

main()
like image 329
Mats Ekberg Avatar asked Oct 29 '11 10:10

Mats Ekberg


People also ask

How do you use multiprocessing in Python?

In this example, at first we import the Process class then initiate Process object with the display() function. Then process is started with start() method and then complete the process with the join() method. We can also pass arguments to the function using args keyword.

Can you use pipes in Python?

Pipe is a Python library that enables you to use pipes in Python. A pipe ( | ) passes the results of one method to another method. I like Pipe because it makes my code look cleaner when applying multiple methods to a Python iterable. Since Pipe only provides a few methods, it is also very easy to learn Pipe.

How many processes should be running Python multiprocessing?

If we are using the context manager to create the process pool so that it is automatically shutdown, then you can configure the number of processes in the same manner. The number of workers must be less than or equal to 61 if Windows is your operating system.

Which is better in Python multiprocessing or multithreading?

If your program is IO-bound, both multithreading and multiprocessing in Python will work smoothly. However, If the code is CPU-bound and your machine has multiple cores, multiprocessing would be a better choice.


1 Answers

Yep, that is surprising behaviour indeed.

However, if you look at the output of lsof for the two parallel child processes it is easy to notice that the second child process has more file descriptors open.

What happens is that when two parallel child processes get started the second child inherits the pipes of the parent, so that when the parent calls self.parent_conn.close() the second child still has that pipe file descriptor open, so that the pipe file description doesn't get closed in the kernel (the reference count is more than 0), with the effect being that self.child_conn.recv_bytes() in the first parallel child process never read()s EOF and EOFError gets never thrown.

You may need to send an explicit shutdown message, rather then just closing file descriptors because there seem to be little control over what file descriptors get shared between which processes (there is no close-on-fork file descriptor flag).

like image 132
Maxim Egorushkin Avatar answered Oct 20 '22 20:10

Maxim Egorushkin