Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

why in python map() and multiprocessing.Pool.map() got different answers?

I had a strange problem. I have a file of the format:

START
1
2
STOP
lllllllll
START
3
5
6
STOP

and I want to read the lines between START and STOP as blocks, and use my_f to process each block.

def block_generator(file):

with open(file) as lines:
    for line in lines:
        if line == 'START': 
            block=itertools.takewhile(lambda x:x!='STOP',lines) 
            yield block   

and in my main function I tried to use map() to get the work done. It worked.

blocks=block_generator(file)
map(my_f,blocks)

will actually give me what I want. But when I tried the same thing with multiprocessing.Pool.map(), it gave me an error said takewhile() wanted to take 2 arguments, was given 0.

    blocks=block_generator(file)
    p=multiprocessing.Pool(4) 
    p.map(my_f,blocks)

Is this a bug?

  1. The file have more than 1000000 blocks, each has less than 100 lines.
  2. I accept the answer form untubu.
  3. But maybe I will simple split the file and use n instance of my original script without multiprocessing to processing them then cat the results together. This way you can never be wrong as long as the script works on a small file.
like image 312
gstar2002 Avatar asked May 02 '26 04:05

gstar2002


1 Answers

How about:

import itertools

def grouper(n, iterable, fillvalue=None):
    # Source: http://docs.python.org/library/itertools.html#recipes
    "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
    return itertools.izip_longest(*[iter(iterable)]*n,fillvalue=fillvalue)

def block_generator(file):
    with open(file) as lines:
        for line in lines:
            if line == 'START': 
                block=list(itertools.takewhile(lambda x:x!='STOP',lines))
                yield block

blocks=block_generator(file)
p=multiprocessing.Pool(4)
for chunk in grouper(100,blocks,fillvalue=''):
    p.map(my_f,chunk)

Using grouper will limit the amount of the file consumed by p.map. Thus the whole file need not be read into memory (fed into the task queue) at once.


I claim above that when you call p.map(func,iterator), the entire iterator is consumed immediatedly to fill a task queue. The pool workers then get tasks from the queue and work on the jobs concurrently.

If you look inside pool.py and trace through the definitions, you will see the _handle_tasks thread gets items from self._taskqueue, and enumerates that at once:

         for i, task in enumerate(taskseq):
             ...
             put(task)

The conclusion is, the iterator passed to p.map gets consumed at once. There is no waiting for the one task to end before the next task is gotten from the queue.

As further corroboration, if you run this:

demonstration code:

import multiprocessing as mp
import time
import logging

def foo(x):
    time.sleep(1)
    return x*x

def blocks():
    for x in range(1000):
        if x%100==0:
            logger.info('Got here')
        yield x

logger=mp.log_to_stderr(logging.DEBUG)
logger.setLevel(logging.DEBUG) 
pool=mp.Pool() 
print pool.map(foo, blocks()) 

You will see the Got here message printed 10 times almost immediately, and then a long pause due to the time.sleep(1) call in foo. This manifestly shows the iterator is fully consumed long before the pool processes gets around to finishing the tasks.

like image 75
unutbu Avatar answered May 03 '26 18:05

unutbu



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!