Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python pool apply_async and map_async do not block on full queue

I am fairly new to python. I am using the multiprocessing module for reading lines of text on stdin, converting them in some way and writing them into a database. Here's a snippet of my code:

batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        pool.apply_async(insert, args=(batch,i+1))
        batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()

Now that all works fine, until I get to process huge input files (hundreds of millions of lines) that i pipe into my python program. At some point, when my database gets slower, I see the memory getting full.

After some playing, it turned out that pool.apply_async as well as pool.map_async never ever block, so that the queue of the calls to be processed grows bigger and bigger.

What is the correct approach to my problem? I would expect a parameter that I can set, that will block the pool.apply_async call, as soon as a certain queue length has been reached. AFAIR in Java one can give the ThreadPoolExecutor a BlockingQueue with a fixed length for that purpose.

Thanks!

like image 211
konstantin Avatar asked Mar 07 '12 12:03

konstantin


People also ask

How does pool Apply_async work?

apply_async() The apply_async() function can be called directly to execute a target function in the process pool. The call will not block, but will instead immediately return an AsyncResult object that we can ignore if our function does not return a value.

What does multiprocessing pool do in Python?

Python multiprocessing Pool can be used for parallel execution of a function across multiple input values, distributing the input data across processes (data parallelism).

How do processes pools work in multiprocessing?

Pool is generally used for heterogeneous tasks, whereas multiprocessing. Process is generally used for homogeneous tasks. The Pool is designed to execute heterogeneous tasks, that is tasks that do not resemble each other. For example, each task submitted to the process pool may be a different target function.

How do you pass multiple arguments in multiprocessing Python?

Use Pool. The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.


2 Answers

The apply_async and map_async functions are designed not to block the main process. In order to do so, the Pool maintains an internal Queue which size is unfortunately impossible to change.

The way the problem can be solved is by using a Semaphore initialized with the size you want the queue to be. You acquire and release the semaphore before feeding the pool and after a worker has completed the task.

Here's an example working with Python 2.6 or greater.

from threading import Semaphore
from multiprocessing import Pool

def task_wrapper(f):
    """Python2 does not allow a callback for method raising exceptions,
    this wrapper ensures the code run into the worker will be exception free.

    """
    try:
        return f()
    except:
        return None

class TaskManager(object):
    def __init__(self, processes, queue_size):
        self.pool = Pool(processes=processes)
        self.workers = Semaphore(processes + queue_size)

    def new_task(self, f):
        """Start a new task, blocks if queue is full."""
        self.workers.acquire()
        self.pool.apply_async(task_wrapper, args=(f, ), callback=self.task_done))

    def task_done(self):
        """Called once task is done, releases the queue is blocked."""
        self.workers.release()

Another example using concurrent.futures pools implementation.

like image 54
noxdafox Avatar answered Nov 10 '22 00:11

noxdafox


Just in case some one ends up here, this is how I solved the problem: I stopped using multiprocessing.Pool. Here is how I do it now:

#set amount of concurrent processes that insert db data
processes = multiprocessing.cpu_count() * 2

#setup batch queue
queue = multiprocessing.Queue(processes * 2)

#start processes
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() 

#fill queue with batches    
batch=[]
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        queue.put((batch,i+1))
        batch = []
if batch:
    queue.put((batch,i+1))

#stop processes using poison-pill
for _ in range(processes): queue.put((None,None))

print "all done."

in the insert method the processing of each batch is wrapped in a loop that pulls from the queue until it receives the poison pill:

while True:
    batch, end = queue.get()
    if not batch and not end: return #poison pill! complete!
    [process the batch]
print 'worker done.'
like image 31
konstantin Avatar answered Nov 10 '22 00:11

konstantin