I want to use the multiprocessing.Manager() object so I can asynchronously send information from a worker to the manager to send information to a server. What I have is roughly 10 instances writing PDFs to disk. I then wanted to use the manager object in the multiprocessing package to send that data to my S3 bucket because I do not want to hold up the local content generation.
So I was wondering if I create a custom manager object, is this the proper way to do this? Will each process submitted to the manager object get queued? or if I call multiple uploads, will the manager drop some of the calls?
Below is a sample code of what I am thinking of doing:
from multiprocessing.managers import BaseManager
class UploadClass(object):
def upload(self, filePath, params, destUrl):
# do stuff
return results
class MyManager(BaseManager):
pass
MyManager.register('uploads', UploadClass)
if __name__ == '__main__':
manager = MyManager()
manager.start()
upload = manager.uploads()
# do this wait for completion or do they perform this async
print upload.upload(r"< path >", {...}, "some url")
print upload.upload(r"< path >", {...}, "some url")
To directly answer some of your questions:
Will each process submitted to the manager object get queued?
The Manager
server spawns a new thread to handle each incoming request, so all your requests will starting being handled instantly. You can see this inside of multiprocessing/managers.py
:
def serve_forever(self):
'''
Run the server forever
'''
current_process()._manager_server = self
try:
try:
while 1:
try:
c = self.listener.accept()
except (OSError, IOError):
continue
t = threading.Thread(target=self.handle_request, args=(c,))
t.daemon = True
t.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
self.stop = 999
self.listener.close()
if I call multiple uploads, will the manager drop some of the calls?
No, none of the calls will be dropped.
# do this wait for completion or do they perform this async print upload.upload(r"< path >", {...}, "some url") print upload.upload(r"< path >", {...}, "some url")
Both of the calls to upload.upload
will be synchronous; they won't return until UploadClass.upload
has completed. However, if you were to have multiple scripts/threads/processes calling upload.upload
concurrently, each unique call would be happening concurrently inside its own thread in the Manager
server process.
And your most most important question:
is this the proper way to do this?
I would say no, if I understand the question properly. If you just have one script, and then spawn ten multiprocessing.Process
instances inside that one script to write out the PDFs, then you should just use another multiprocessing.Process
to handle the uploads:
def upload(self, q):
for payload in iter(q.get, None): # Keep getting from the queue until a None is found
filePath, params, destUrl = payload
# do stuff
def write_pdf(pdf_file_info, q):
# write a pdf to disk here
q.put((filepath, params, destUrl)) # Send work to the uploader
# Move on with whatever comes next.
if __name__ == '__main__':
pdf_queue = multiprocessing.Queue()
# Start uploader
upload_proc = multiprocessing.Process(upload, args=(pdf_queue,))
upload_proc.start()
# Start pdf writers
procs = []
for pdf in pdfs_to_write:
p = multiprocessing.Process(write_pdf, args=(pdf, pdf_queue))
p.start()
p.append(procs)
# Wait for pdf writers and uploader to finish.
for p in procs:
p.join()
pdf_queue.put(None) # Sending None breaks the for loop inside upload
upload_proc.join()
If you're actually ok with concurrent uploads, then there's no need to have a separate upload
process at all - just upload from the pdf writing processes directly.
It's hard to tell from your question if this is exactly what you're doing, though. Once you clarify, I'll adjust this last piece to fit your specific use-case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With