At the bottom is the code I have now. It seems to work fine. However, I don't completely understand it. I thought without .join()
, I'd risking the code going onto the next for-loop before the pool finishes executing. Wouldn't we need those 3 commented-out lines?
On the other hand, if I were to go with the .close()
and .join()
way, is there any way to 'reopen' that closed pool instead of Pool(6)
every time?
import multiprocessing as mp
import random as rdm
from statistics import stdev, mean
import time
def mesh_subset(population, n_chosen=5):
chosen = rdm.choices(population, k=n_chosen)
return mean(chosen)
if __name__ == '__main__':
population = [x for x in range(20)]
N_iteration = 10
start_time = time.time()
pool = mp.Pool(6)
for i in range(N_iteration):
print([round(x,2) for x in population])
print(stdev(population))
# pool = mp.Pool(6)
population = pool.map(mesh_subset, [population]*len(population))
# pool.close()
# pool.join()
print('run time:', time.time() - start_time)
Understand multiprocessing in no more than 6 minutes Multiprocessing is quintessential when a long-running process has to be speeded up or multiple processes have to execute parallelly. Executing a process on a single core confines its capability, which could otherwise spread its tentacles across multiple cores.
Multiprocessor system facilitates parallel program execution and read/write sharing of data and thus may cause the processors to concurrently access location in the shared memory. Therefore, a correct and reliable mechanism is needed to serialize this access.
Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.
Pool. apply_async is also like Python's built-in apply , except that the call returns immediately instead of waiting for the result. An AsyncResult object is returned. You call its get() method to retrieve the result of the function call.
A pool of workers is a relatively costly thing to set up, so it should be done (if possible) only once, usually at the beginning of the script.
The pool.map
command blocks until all the tasks are completed. After all, it returns a list of the results. It couldn't do that unless mesh_subset
has been called on all the inputs and has returned a result for each. In contrast, methods like pool.apply_async
do not block. apply_async
returns an ApplyResult object with a get
method which blocks until it obtains a result from a worker process.
pool.close
sets the worker handler's state to CLOSE. This causes the handler to signal the workers to terminate.
The pool.join
blocks until all the worker processes have been terminated.
So you don't need to call -- in fact you shouldn't call -- pool.close
and pool.join
until you are finished with the pool. Once the workers have been sent the singnal to terminate (by pool.close
), there is no way to "reopen" them. You would need to start a new pool instead.
In your situation, since you do want the loop to wait until all the tasks are completed, there would be no advantage to using pool.apply_async
instead of pool.map
. But if you were to use pool.apply_async
, you could obtain the same result as before by calling get
instead of resorting to closing and restarting the pool:
# you could do this, but using pool.map is simpler
for i in range(N_iteration):
apply_results = [pool.apply_async(mesh_subset, [population]) for i in range(len(population))]
# the call to result.get() blocks until its worker process (running
# mesh_subset) returns a value
population = [result.get() for result in apply_results]
When the loops complete, len(population)
is unchanged.
If you did NOT want each loop to block until all the tasks are completed, you could use apply_async
's callback
feature:
N_pop = len(population)
result = []
for i in range(N_iteration):
for i in range(N_pop):
pool.apply_async(mesh_subset, [population]),
callback=result.append)
pool.close()
pool.join()
print(result)
Now, when any mesh_subset
returns a return_value
,
result.append(return_value)
is called. The calls to apply_async
do not
block, so N_iteration * N_pop
tasks are pushed into the pool
s task
queue all at once. But since the pool has 6 workers, at most 6 calls to
mesh_subset
are running at any given time. As the workers complete the tasks,
whichever worker finishes first calls result.append(return_value)
. So the
values in result
are unordered. This is different than pool.map
which
returns a list whose return values are in the same order as its corresponding
list of arguments.
Barring an exception, result
will eventually contain N_iteration * N_pop
return values once
all the tasks complete. Above, pool.close()
and pool.join()
were used to
wait for all the tasks to complete.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With