When I use a generator as an iterable argument with multiprocessing.Pool.map function:
pool.map(func, iterable=(x for x in range(10)))
It seems that the generator is fully exhausted before func
is ever called.
I want to yield each item and pass it to each process, thanks
Generators are functions having an yield keyword. Any function which has “yield” in it is a generator. Calling a generator function creates an iterable. Since it is an iterable so it can be used with iter() and with a for loop.
map() returns a map object, which is an iterator that yields items on demand. So, the natural replacement for map() is a generator expression because generator expressions return generator objects, which are also iterators that yield items on demand.
Pool is generally used for heterogeneous tasks, whereas multiprocessing. Process is generally used for homogeneous tasks. The Pool is designed to execute heterogeneous tasks, that is tasks that do not resemble each other. For example, each task submitted to the process pool may be a different target function.
Use the multiprocessing pool if your tasks are independent. This means that each task is not dependent on other tasks that could execute at the same time. It also may mean tasks that are not dependent on any data other than data provided via function arguments to the task.
Alas, this isn't well-defined. Here's a test case I'm running under Python 3.6.1:
import multiprocessing as mp
def e(i):
if i % 1000000 == 0:
print(i)
if __name__ == '__main__':
p = mp.Pool()
def g():
for i in range(100000000):
yield i
print("generator done")
r = p.map(e, g())
p.close()
p.join()
The first thing you see is the "generator done" message, and peak memory use is unreasonably high (precisely because, as you suspect, the generator is run to exhaustion before any work is passed out).
However, replace the map()
call like so:
r = list(p.imap(e, g()))
Now memory use remains small, and "generator done" appears at the output end.
However, you won't wait long enough to see that, because it's horridly slow :-( imap()
not only treats that iterable as an iterable, but effectively passes only 1 item at a time across process boundaries. To get speed back too, this works:
r = list(p.imap(e, g(), chunksize=10000))
In real life, I'm much more likely to iterate over an imap()
(or imap_unordered()
) result than to force it into a list, and then memory use remains small for looping over the results too.
multiprocessing.map
converts iterables without a __len__
method to a list before processing. This is done to aid the calculation of chunksize, which the pool uses to group worker arguments and reduce the round trip cost of scheduling jobs. This is not optimal, especially when chunksize is 1, but since map
must exhaust the iterator one way or the other, its usually not a significant issue.
The relevant code is in pool.py
. Notice its use of len
:
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
'''
Helper function to implement map, starmap and their async counterparts.
'''
if self._state != RUN:
raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
if len(iterable) == 0:
chunksize = 0
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With