There is a strange behavior of map
when using Python's multiprocessing.Pool
. In the example below a pool of 4 processors will work on 28 tasks. This should take seven passes, each taking 4 seconds.
However, it takes 8 passes. In the first six passes all processors are engaged. In the 7th pass only two tasks are completed (two idling processors). The remaining 2 tasks are finished in the 8th pass (two idling processors, again). This behavior appears for seemingly random combinations of number of cpus and number of tasks, unnecessarily losing time.
This example has been reproduced on both Intel Xeon Haswell (20 cores) and Intel i7 (4 cores).
Any ideas on how to force Pool
to make use of all available processors in all the passes?
import time
import multiprocessing
from multiprocessing import Pool
import datetime
def f(values):
now = str(datetime.datetime.now())
proc_id = str(multiprocessing.current_process())
print(proc_id+' '+now)
a=values**2
time.sleep(4)
return a
if __name__ == '__main__':
p = Pool(4) #number of processes
processed_values= p.map( f, range(28))
p.close()
p.join()
print processed_values
The output of the run is given below
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:49.604065
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:49.604189
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:49.604252
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:49.604866
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:53.608475
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:53.608878
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:53.608931
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:53.609503
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:08:57.612831
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:08:57.613135
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:08:57.613555
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:08:57.614065
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:01.616974
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:01.617273
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:01.617699
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:01.618190
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:05.621284
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:05.621489
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:05.622130
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:05.622404
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:09.625522
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:09.625631
<Process(PoolWorker-3, started daemon)> 2016-05-13 17:09:09.626555
<Process(PoolWorker-4, started daemon)> 2016-05-13 17:09:09.626566
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:13.629761
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:13.629846
<Process(PoolWorker-1, started daemon)> 2016-05-13 17:09:17.634003
<Process(PoolWorker-2, started daemon)> 2016-05-13 17:09:17.634317
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729]
This is related to the following question, which does not have a clear or correct answer. Python: Multiprocessing Map takes longer to complete last few processes
This is caused by the way Pool.map
chunks up the iterable you pass it and sends it to each worker in the Pool
. If you force the chunksize
to be 1, you'll see the behavior you expect:
import time
import multiprocessing
from multiprocessing import Pool
import datetime
def f(values):
now = str(datetime.datetime.now())
proc_id = str(multiprocessing.current_process())
print(proc_id+' '+now)
a=values**2
time.sleep(4)
return a
if __name__ == '__main__':
p = Pool(4) #number of processes
processed_values= p.map( f, range(28), chunksize=1)
p.close()
p.join()
print processed_values
Output:
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:06.548733
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:06.548803
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:06.549013
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:06.549052
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:10.549509
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:10.551091
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:10.553057
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:10.553263
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:14.553765
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:14.553821
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:14.554953
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:14.557262
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:18.556535
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:18.556611
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:18.558019
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:18.561597
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:22.560039
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:22.560097
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:22.562236
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:22.565912
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:26.564383
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:26.564430
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:26.564589
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:26.570232
<Process(PoolWorker-2, started daemon)> 2016-05-13 21:34:30.568634
<Process(PoolWorker-3, started daemon)> 2016-05-13 21:34:30.568647
<Process(PoolWorker-4, started daemon)> 2016-05-13 21:34:30.568752
<Process(PoolWorker-1, started daemon)> 2016-05-13 21:34:30.574456
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729]
The algorithm that map
uses to pick a chunksize when you don't provide one looks like this:
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
For an iterable of size 28, that comes out to 2. This means that each worker process is grabbing two items from your iterable at a time, not one. So, when there are only four items left in the queue, the first free worker gets two, and the second free worker gets two, leaving no more for the other two workers.
The reason for the chunking in the first place is it greatly improves performance when dealing with very large iterables, by reducing IPC overhead. For smaller iterables it tends to not make much difference, or even hurt performance, as it does in this case.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With