Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use multiprocessing with class instances in Python?

I am trying to create a class than can run a separate process to go do some work that takes a long time, launch a bunch of these from a main module and then wait for them all to finish. I want to launch the processes once and then keep feeding them things to do rather than creating and destroying processes. For example, maybe I have 10 servers running the dd command, then I want them all to scp a file, etc.

My ultimate goal is to create a class for each system that keeps track of the information for the system in which it is tied to like IP address, logs, runtime, etc. But that class must be able to launch a system command and then return execution back to the caller while that system command runs, to followup with the result of the system command later.

My attempt is failing because I cannot send an instance method of a class over the pipe to the subprocess via pickle. Those are not pickleable. I therefore tried to fix it various ways but I can't figure it out. How can my code be patched to do this? What good is multiprocessing if you can't send over anything useful?

Is there any good documentation of multiprocessing being used with class instances? The only way I can get the multiprocessing module to work is on simple functions. Every attempt to use it within a class instance has failed. Maybe I should pass events instead? I don't understand how to do that yet.

import multiprocessing import sys import re  class ProcessWorker(multiprocessing.Process):     """     This class runs as a separate process to execute worker's commands in parallel     Once launched, it remains running, monitoring the task queue, until "None" is sent     """      def __init__(self, task_q, result_q):         multiprocessing.Process.__init__(self)         self.task_q = task_q         self.result_q = result_q         return      def run(self):         """         Overloaded function provided by multiprocessing.Process.  Called upon start() signal         """         proc_name = self.name         print '%s: Launched' % (proc_name)         while True:             next_task_list = self.task_q.get()             if next_task is None:                 # Poison pill means shutdown                 print '%s: Exiting' % (proc_name)                 self.task_q.task_done()                 break             next_task = next_task_list[0]             print '%s: %s' % (proc_name, next_task)             args = next_task_list[1]             kwargs = next_task_list[2]             answer = next_task(*args, **kwargs)             self.task_q.task_done()             self.result_q.put(answer)         return # End of ProcessWorker class  class Worker(object):     """     Launches a child process to run commands from derived classes in separate processes,     which sit and listen for something to do     This base class is called by each derived worker     """     def __init__(self, config, index=None):         self.config = config         self.index = index          # Launce the ProcessWorker for anything that has an index value         if self.index is not None:             self.task_q = multiprocessing.JoinableQueue()             self.result_q = multiprocessing.Queue()              self.process_worker = ProcessWorker(self.task_q, self.result_q)             self.process_worker.start()             print "Got here"             # Process should be running and listening for functions to execute         return      def enqueue_process(target):  # No self, since it is a decorator         """         Used to place an command target from this class object into the task_q         NOTE: Any function decorated with this must use fetch_results() to get the         target task's result value         """         def wrapper(self, *args, **kwargs):             self.task_q.put([target, args, kwargs]) # FAIL: target is a class instance method and can't be pickled!         return wrapper      def fetch_results(self):         """         After all processes have been spawned by multiple modules, this command         is called on each one to retreive the results of the call.         This blocks until the execution of the item in the queue is complete         """         self.task_q.join()                          # Wait for it to to finish         return self.result_q.get()                  # Return the result      @enqueue_process     def run_long_command(self, command):         print "I am running number % as process "%number, self.name          # In here, I will launch a subprocess to run a  long-running system command         # p = Popen(command), etc         # p.wait(), etc         return       def close(self):         self.task_q.put(None)         self.task_q.join()  if __name__ == '__main__':     config = ["some value", "something else"]     index = 7     workers = []     for i in range(5):         worker = Worker(config, index)         worker.run_long_command("ls /")         workers.append(worker)     for worker in workers:         worker.fetch_results()      # Do more work... (this would actually be done in a distributor in another class)      for worker in workers:         worker.close()  

Edit: I tried to move the ProcessWorker class and the creation of the multiprocessing queues outside of the Worker class and then tried to manually pickle the worker instance. Even that doesn't work and I get an error

RuntimeError: Queue objects should only be shared between processes through inheritance

. But I am only passing references of those queues into the worker instance?? I am missing something fundamental. Here is the modified code from the main section:

if __name__ == '__main__':     config = ["some value", "something else"]     index = 7     workers = []     for i in range(1):         task_q = multiprocessing.JoinableQueue()         result_q = multiprocessing.Queue()         process_worker = ProcessWorker(task_q, result_q)         worker = Worker(config, index, process_worker, task_q, result_q)         something_to_look_at = pickle.dumps(worker) # FAIL:  Doesn't like queues??         process_worker.start()         worker.run_long_command("ls /") 
like image 914
David Lynch Avatar asked Jan 05 '13 07:01

David Lynch


1 Answers

So, the problem was that I was assuming that Python was doing some sort of magic that is somehow different from the way that C++/fork() works. I somehow thought that Python only copied the class, not the whole program into a separate process. I seriously wasted days trying to get this to work because all of the talk about pickle serialization made me think that it actually sent everything over the pipe. I knew that certain things could not be sent over the pipe, but I thought my problem was that I was not packaging things up properly.

This all could have been avoided if the Python docs gave me a 10,000 ft view of what happens when this module is used. Sure, it tells me what the methods of multiprocess module does and gives me some basic examples, but what I want to know is what is the "Theory of Operation" behind the scenes! Here is the kind of information I could have used. Please chime in if my answer is off. It will help me learn.

When you run start a process using this module, the whole program is copied into another process. But since it is not the "__main__" process and my code was checking for that, it doesn't fire off yet another process infinitely. It just stops and sits out there waiting for something to do, like a zombie. Everything that was initialized in the parent at the time of calling multiprocess.Process() is all set up and ready to go. Once you put something in the multiprocess.Queue or shared memory, or pipe, etc. (however you are communicating), then the separate process receives it and gets to work. It can draw upon all imported modules and setup just as if it was the parent. However, once some internal state variables change in the parent or separate process, those changes are isolated. Once the process is spawned, it now becomes your job to keep them in sync if necessary, either through a queue, pipe, shared memory, etc.

I threw out the code and started over, but now I am only putting one extra function out in the ProcessWorker, an "execute" method that runs a command line. Pretty simple. I don't have to worry about launching and then closing a bunch of processes this way, which has caused me all kinds of instability and performance issues in the past in C++. When I switched to launching processes at the beginning and then passing messages to those waiting processes, my performance improved and it was very stable.

BTW, I looked at this link to get help, which threw me off because the example made me think that methods were being transported across the queues: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html The second example of the first section used "next_task()" that appeared (to me) to be executing a task received via the queue.

like image 64
David Lynch Avatar answered Sep 19 '22 15:09

David Lynch