Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apply a method to a list of objects in parallel using multi-processing

I have created a class with a number of methods. One of the methods is very time consuming, my_process, and I'd like to do that method in parallel. I came across Python Multiprocessing - apply class method to a list of objects but I'm not sure how to apply it to my problem, and what effect it will have on the other methods of my class.

class MyClass():
    def __init__(self, input):
        self.input = input
        self.result = int

    def my_process(self, multiply_by, add_to):
        self.result = self.input * multiply_by
        self._my_sub_process(add_to)
        return self.result

    def _my_sub_process(self, add_to):
        self.result += add_to

list_of_numbers = range(0, 5)
list_of_objects = [MyClass(i) for i in list_of_numbers]
list_of_results = [obj.my_process(100, 1) for obj in list_of_objects] # multi-process this for-loop

print list_of_numbers
print list_of_results

[0, 1, 2, 3, 4]
[1, 101, 201, 301, 401]
like image 438
bluprince13 Avatar asked Mar 24 '17 14:03

bluprince13


People also ask

Which is the method used to change the default way to create child processes in multiprocessing?

Python provides the ability to create and manage new processes via the multiprocessing. Process class. In multiprocessing programming, we may need to change the technique used to start child processes. This is called the start method.

How do you use multiprocessing in Python?

In this example, at first we import the Process class then initiate Process object with the display() function. Then process is started with start() method and then complete the process with the join() method. We can also pass arguments to the function using args keyword.

What does process join () do?

The join method blocks the execution of the main process until the process whose join method is called terminates. Without the join method, the main process won't wait until the process gets terminated. The example calls the join on the newly created process.


4 Answers

I'm going to go against the grain here, and suggest sticking to the simplest thing that could possibly work ;-) That is, Pool.map()-like functions are ideal for this, but are restricted to passing a single argument. Rather than make heroic efforts to worm around that, simply write a helper function that only needs a single argument: a tuple. Then it's all easy and clear.

Here's a complete program taking that approach, which prints what you want under Python 2, and regardless of OS:

class MyClass():
    def __init__(self, input):
        self.input = input
        self.result = int

    def my_process(self, multiply_by, add_to):
        self.result = self.input * multiply_by
        self._my_sub_process(add_to)
        return self.result

    def _my_sub_process(self, add_to):
        self.result += add_to

import multiprocessing as mp
NUM_CORE = 4  # set to the number of cores you want to use

def worker(arg):
    obj, m, a = arg
    return obj.my_process(m, a)

if __name__ == "__main__":
    list_of_numbers = range(0, 5)
    list_of_objects = [MyClass(i) for i in list_of_numbers]

    pool = mp.Pool(NUM_CORE)
    list_of_results = pool.map(worker, ((obj, 100, 1) for obj in list_of_objects))
    pool.close()
    pool.join()

    print list_of_numbers
    print list_of_results

A big of magic

I should note there are many advantages to taking the very simple approach I suggest. Beyond that it "just works" on Pythons 2 and 3, requires no changes to your classes, and is easy to understand, it also plays nice with all of the Pool methods.

However, if you have multiple methods you want to run in parallel, it can get a bit annoying to write a tiny worker function for each. So here's a tiny bit of "magic" to worm around that. Change worker() like so:

def worker(arg):
    obj, methname = arg[:2]
    return getattr(obj, methname)(*arg[2:])

Now a single worker function suffices for any number of methods, with any number of arguments. In your specific case, just change one line to match:

list_of_results = pool.map(worker, ((obj, "my_process", 100, 1) for obj in list_of_objects))

More-or-less obvious generalizations can also cater to methods with keyword arguments. But, in real life, I usually stick to the original suggestion. At some point catering to generalizations does more harm than good. Then again, I like obvious things ;-)

like image 141
Tim Peters Avatar answered Oct 09 '22 20:10

Tim Peters


If your class is not "huge", I think process oriented is better. Pool in multiprocessing is suggested.
This is the tutorial -> https://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

Then seperate the add_to from my_process since they are quick and you can wait util the end of the last process.

def my_process(input, multiby):
    return xxxx
def add_to(result,a_list):
    xxx
p = Pool(5)
res = []
for i in range(10):
    res.append(p.apply_async(my_process, (i,5)))
p.join()  # wait for the end of the last process
for i in range(10):
    print res[i].get()
like image 37
Zealseeker Avatar answered Oct 09 '22 20:10

Zealseeker


Generally the easiest way to run the same calculation in parallel is the map method of a multiprocessing.Pool (or the as_completed function from concurrent.futures in Python 3).

However, the map method applies a function that only takes one argument to an iterable of data using multiple processes.

So this function cannot be a normal method, because that requires at least two arguments; it must also include self! It could be a staticmethod, however. See also this answer for a more in-depth explanation.

like image 33
Roland Smith Avatar answered Oct 09 '22 22:10

Roland Smith


Based on the answer of Python Multiprocessing - apply class method to a list of objects and your code:

  1. add MyClass object into simulation object

    class simulation(multiprocessing.Process):
        def __init__(self, id, worker, *args, **kwargs):
            # must call this before anything else
            multiprocessing.Process.__init__(self)
            self.id = id
            self.worker = worker
            self.args = args
            self.kwargs = kwargs
            sys.stdout.write('[%d] created\n' % (self.id))
    
  2. run what you want in run function

        def run(self):
            sys.stdout.write('[%d] running ...  process id: %s\n' % (self.id, os.getpid()))
            self.worker.my_process(*self.args, **self.kwargs)
            sys.stdout.write('[%d] completed\n' % (self.id))
    

Try this:

list_of_numbers = range(0, 5)
list_of_objects = [MyClass(i) for i in list_of_numbers]
list_of_sim = [simulation(id=k, worker=obj, multiply_by=100*k, add_to=10*k) \
    for k, obj in enumerate(list_of_objects)]  

for sim in list_of_sim:
    sim.start()
like image 41
Huu-Danh Pham Avatar answered Oct 09 '22 22:10

Huu-Danh Pham