Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use a generator as an iterable with Multiprocessing map function

When I use a generator as an iterable argument with multiprocessing.Pool.map function:

pool.map(func, iterable=(x for x in range(10)))

It seems that the generator is fully exhausted before func is ever called.

I want to yield each item and pass it to each process, thanks

like image 748
RustyShackleford Avatar asked Jun 22 '17 19:06

RustyShackleford


People also ask

Is a generator function iterable?

Generators are functions having an yield keyword. Any function which has “yield” in it is a generator. Calling a generator function creates an iterable. Since it is an iterable so it can be used with iter() and with a for loop.

Is map function a generator?

map() returns a map object, which is an iterator that yields items on demand. So, the natural replacement for map() is a generator expression because generator expressions return generator objects, which are also iterators that yield items on demand.

What is the difference between pool and process in multiprocessing?

Pool is generally used for heterogeneous tasks, whereas multiprocessing. Process is generally used for homogeneous tasks. The Pool is designed to execute heterogeneous tasks, that is tasks that do not resemble each other. For example, each task submitted to the process pool may be a different target function.

When would you use a multiprocessing pool?

Use the multiprocessing pool if your tasks are independent. This means that each task is not dependent on other tasks that could execute at the same time. It also may mean tasks that are not dependent on any data other than data provided via function arguments to the task.


2 Answers

Alas, this isn't well-defined. Here's a test case I'm running under Python 3.6.1:

import multiprocessing as mp

def e(i):
    if i % 1000000 == 0:
        print(i)

if __name__ == '__main__':
    p = mp.Pool()
    def g():
        for i in range(100000000):
            yield i
        print("generator done")
    r = p.map(e, g())
    p.close()
    p.join()

The first thing you see is the "generator done" message, and peak memory use is unreasonably high (precisely because, as you suspect, the generator is run to exhaustion before any work is passed out).

However, replace the map() call like so:

r = list(p.imap(e, g()))

Now memory use remains small, and "generator done" appears at the output end.

However, you won't wait long enough to see that, because it's horridly slow :-( imap() not only treats that iterable as an iterable, but effectively passes only 1 item at a time across process boundaries. To get speed back too, this works:

r = list(p.imap(e, g(), chunksize=10000))

In real life, I'm much more likely to iterate over an imap() (or imap_unordered()) result than to force it into a list, and then memory use remains small for looping over the results too.

like image 52
Tim Peters Avatar answered Nov 15 '22 15:11

Tim Peters


multiprocessing.map converts iterables without a __len__ method to a list before processing. This is done to aid the calculation of chunksize, which the pool uses to group worker arguments and reduce the round trip cost of scheduling jobs. This is not optimal, especially when chunksize is 1, but since map must exhaust the iterator one way or the other, its usually not a significant issue.

The relevant code is in pool.py. Notice its use of len:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
        error_callback=None):
    '''
    Helper function to implement map, starmap and their async counterparts.
    '''
    if self._state != RUN:
        raise ValueError("Pool not running")
    if not hasattr(iterable, '__len__'):
        iterable = list(iterable)

    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0
like image 37
tdelaney Avatar answered Nov 15 '22 15:11

tdelaney