Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Properly Designing a Multiprocessing.Manager Custom Object

I want to use the multiprocessing.Manager() object so I can asynchronously send information from a worker to the manager to send information to a server. What I have is roughly 10 instances writing PDFs to disk. I then wanted to use the manager object in the multiprocessing package to send that data to my S3 bucket because I do not want to hold up the local content generation.

So I was wondering if I create a custom manager object, is this the proper way to do this? Will each process submitted to the manager object get queued? or if I call multiple uploads, will the manager drop some of the calls?

Below is a sample code of what I am thinking of doing:

from multiprocessing.managers import BaseManager

class UploadClass(object):
    def upload(self, filePath, params, destUrl):
        # do stuff
        return results

class MyManager(BaseManager):
    pass

MyManager.register('uploads', UploadClass)

if __name__ == '__main__':
    manager = MyManager()
    manager.start()
    upload = manager.uploads()
    # do this wait for completion or do they perform this async
    print upload.upload(r"< path >", {...}, "some url")
    print upload.upload(r"< path >", {...}, "some url")
like image 479
code base 5000 Avatar asked May 12 '15 12:05

code base 5000


1 Answers

To directly answer some of your questions:

Will each process submitted to the manager object get queued?

The Manager server spawns a new thread to handle each incoming request, so all your requests will starting being handled instantly. You can see this inside of multiprocessing/managers.py:

def serve_forever(self):
    '''
    Run the server forever
    '''
    current_process()._manager_server = self
    try:
        try:
            while 1:
                try:
                    c = self.listener.accept()
                except (OSError, IOError):
                    continue
                t = threading.Thread(target=self.handle_request, args=(c,))
                t.daemon = True
                t.start()
        except (KeyboardInterrupt, SystemExit):
            pass
    finally:
        self.stop = 999
        self.listener.close()

if I call multiple uploads, will the manager drop some of the calls?

No, none of the calls will be dropped.

# do this wait for completion or do they perform this async
print upload.upload(r"< path >", {...}, "some url")
print upload.upload(r"< path >", {...}, "some url")

Both of the calls to upload.upload will be synchronous; they won't return until UploadClass.upload has completed. However, if you were to have multiple scripts/threads/processes calling upload.upload concurrently, each unique call would be happening concurrently inside its own thread in the Manager server process.

And your most most important question:

is this the proper way to do this?

I would say no, if I understand the question properly. If you just have one script, and then spawn ten multiprocessing.Process instances inside that one script to write out the PDFs, then you should just use another multiprocessing.Process to handle the uploads:

def upload(self, q):
    for payload in iter(q.get, None):  # Keep getting from the queue until a None is found
        filePath, params, destUrl = payload
        # do stuff

def write_pdf(pdf_file_info, q):
   # write a pdf to disk here
   q.put((filepath, params, destUrl))  # Send work to the uploader
   # Move on with whatever comes next.

if __name__ == '__main__':
    pdf_queue = multiprocessing.Queue()

    # Start uploader
    upload_proc = multiprocessing.Process(upload, args=(pdf_queue,))
    upload_proc.start()

    # Start pdf writers
    procs = []
    for pdf in pdfs_to_write: 
         p = multiprocessing.Process(write_pdf, args=(pdf, pdf_queue))
         p.start()
         p.append(procs)

    # Wait for pdf writers and uploader to finish.
    for p in procs:
        p.join()
    pdf_queue.put(None) # Sending None breaks the for loop inside upload
    upload_proc.join()

If you're actually ok with concurrent uploads, then there's no need to have a separate upload process at all - just upload from the pdf writing processes directly.

It's hard to tell from your question if this is exactly what you're doing, though. Once you clarify, I'll adjust this last piece to fit your specific use-case.

like image 130
dano Avatar answered Nov 02 '22 13:11

dano