Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to reuse a multiprocessing pool?

At the bottom is the code I have now. It seems to work fine. However, I don't completely understand it. I thought without .join(), I'd risking the code going onto the next for-loop before the pool finishes executing. Wouldn't we need those 3 commented-out lines?

On the other hand, if I were to go with the .close() and .join() way, is there any way to 'reopen' that closed pool instead of Pool(6) every time?

import multiprocessing as mp
import random as rdm
from statistics import stdev, mean
import time


def mesh_subset(population, n_chosen=5):
    chosen = rdm.choices(population, k=n_chosen)
    return mean(chosen)


if __name__ == '__main__':
    population = [x for x in range(20)]
    N_iteration = 10
    start_time = time.time()
    pool = mp.Pool(6)
    for i in range(N_iteration):
        print([round(x,2) for x in population])
        print(stdev(population))
        # pool = mp.Pool(6)
        population = pool.map(mesh_subset, [population]*len(population))
        # pool.close()
        # pool.join()
    print('run time:', time.time() - start_time)
like image 318
Indominus Avatar asked Dec 24 '18 10:12

Indominus


People also ask

When would you use a multiprocessing pool?

Understand multiprocessing in no more than 6 minutes Multiprocessing is quintessential when a long-running process has to be speeded up or multiple processes have to execute parallelly. Executing a process on a single core confines its capability, which could otherwise spread its tentacles across multiple cores.

What is multiprocess synchronization?

Multiprocessor system facilitates parallel program execution and read/write sharing of data and thus may cause the processors to concurrently access location in the shared memory. Therefore, a correct and reliable mechanism is needed to serialize this access.

How do processes pools work in multiprocessing?

Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.

How does pool Apply_async work?

Pool. apply_async is also like Python's built-in apply , except that the call returns immediately instead of waiting for the result. An AsyncResult object is returned. You call its get() method to retrieve the result of the function call.


1 Answers

A pool of workers is a relatively costly thing to set up, so it should be done (if possible) only once, usually at the beginning of the script.

The pool.map command blocks until all the tasks are completed. After all, it returns a list of the results. It couldn't do that unless mesh_subset has been called on all the inputs and has returned a result for each. In contrast, methods like pool.apply_async do not block. apply_async returns an ApplyResult object with a get method which blocks until it obtains a result from a worker process.

pool.close sets the worker handler's state to CLOSE. This causes the handler to signal the workers to terminate.

The pool.join blocks until all the worker processes have been terminated.

So you don't need to call -- in fact you shouldn't call -- pool.close and pool.join until you are finished with the pool. Once the workers have been sent the singnal to terminate (by pool.close), there is no way to "reopen" them. You would need to start a new pool instead.


In your situation, since you do want the loop to wait until all the tasks are completed, there would be no advantage to using pool.apply_async instead of pool.map. But if you were to use pool.apply_async, you could obtain the same result as before by calling get instead of resorting to closing and restarting the pool:

# you could do this, but using pool.map is simpler
for i in range(N_iteration):
    apply_results = [pool.apply_async(mesh_subset, [population]) for i in range(len(population))]
    # the call to result.get() blocks until its worker process (running
    # mesh_subset) returns a value
    population = [result.get() for result in apply_results]

When the loops complete, len(population) is unchanged.


If you did NOT want each loop to block until all the tasks are completed, you could use apply_async's callback feature:

N_pop = len(population)
result = []
for i in range(N_iteration):
    for i in range(N_pop):
        pool.apply_async(mesh_subset, [population]),
                         callback=result.append)
pool.close()
pool.join()
print(result)

Now, when any mesh_subset returns a return_value, result.append(return_value) is called. The calls to apply_async do not block, so N_iteration * N_pop tasks are pushed into the pools task queue all at once. But since the pool has 6 workers, at most 6 calls to mesh_subset are running at any given time. As the workers complete the tasks, whichever worker finishes first calls result.append(return_value). So the values in result are unordered. This is different than pool.map which returns a list whose return values are in the same order as its corresponding list of arguments.

Barring an exception, result will eventually contain N_iteration * N_pop return values once all the tasks complete. Above, pool.close() and pool.join() were used to wait for all the tasks to complete.

like image 138
unutbu Avatar answered Sep 20 '22 16:09

unutbu