I have a function f that I want to compute on a certain large data in parallel. The data can be divided in many ways and I am trying to make a decision on how to divide it. I am trying to understand how "map" in multiprocessing.Pool distribute/divide the data exactly so that I make the right decision about splitting my data as well as choosing the number of processors. My input data is not simply a list, as in the below example, but rather list of dictionaries and a list of lists so understanding how Pool.map divides the data seems critical.
That being said, I think understanding the simple example would tell about the more complicated one.
The following scipt shows that we are choosing a Pool of 5 processes and the data in [1,2,3]. What is the implicit choice made here for dividing the data?
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
It's not documented, so you shouldn't rely on any particular behavior. You can force it by passing the optional chunksize=
argument. If you don't, a heuristic is used to make up a value of chunksize for you. This can be found in private function _map_async()
, in your source tree's Lib/multiprocessing/Pool.py
:
def _map_async(self, func, iterable, mapper, chunksize=None, ...
'''
Helper function to implement map, starmap and their async counterparts.
'''
...
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
...
len(self._pool)
is the number of worker processes. So, by default, if there are fewer work items than 4 times the number of processes, they're passed out one at a time. That's the case in your specific example (3 <= 4*5
). If there are a great many more work items than processes, the chunk size is picked so that each process will be handed a chunk of work approximately 4 times over the life of the map()
. For example, if you had 500 items in your list, 500 / (5*4) == 25
, and so 25 items at a time would be passed to a worker process.
Why not 100 at a time, so that each of the 5 workers would be invoked just once? Because it's a heuristic ;-) Passing less than that is a tradeoff, balancing the number of times interprocess communication needs to be done against load balancing (the possibility that different work items will require different amounts of time to complete). But nothing about load balancing can be known in advance, so the heuristic gives more (but not absolute!) weight to keeping the number of interprocess calls low.
And that's why it's not documented. It's quite possible that a smarter heuristic will be used someday.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With