Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Who runs the callback when using apply_async method of a multiprocessing pool?

I'm trying to understand a little bit of what's going on behind the scenes when using the apply_sync method of a multiprocessing pool.

Who runs the callback method? Is it the main process that called apply_async?

Let's say I send out a whole bunch of apply_async commands with callbacks and then continue with my program. My program is still doing things when the apply_async's start to finish. How does the callback get run my the "main process" while the main process is still busy with the script?

Here's an example.

import multiprocessing import time  def callback(x):     print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)  def func(x):     print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)     return x  pool = multiprocessing.Pool()  args = range(20)  for a in args:     pool.apply_async(func, (a,), callback=callback)  print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)  t0 = time.time() while time.time() - t0 < 60:     pass  print 'Finished with the script' 

The output is something like

PoolWorker-1 running func with arg 0

PoolWorker-2 running func with arg 1

PoolWorker-3 running func with arg 2

MainProcess going to sleep for a minute <-- main process is busy

PoolWorker-4 running func with arg 3

PoolWorker-1 running func with arg 4

PoolWorker-2 running func with arg 5

PoolWorker-3 running func with arg 6

PoolWorker-4 running func with arg 7

MainProcess running callback with arg 0 <-- main process running callback while it's still in the while loop!!

MainProcess running callback with arg 1

MainProcess running callback with arg 2

MainProcess running callback with arg 3

MainProcess running callback with arg 4

PoolWorker-1 running func with arg 8

...

Finished with script

How is MainProcess running the callback while it's in the middle of that while loop??

There is this statement about the callback in the documentation for multiprocessing.Pool that seems like a hint but I don't understand it.

apply_async(func[, args[, kwds[, callback]]])

A variant of the apply() method which returns a result object.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.

like image 902
Alex Avatar asked Jul 16 '14 01:07

Alex


People also ask

How does pool apply_async work?

The apply_async() function can be called directly to execute a target function in the process pool. The call will not block, but will instead immediately return an AsyncResult object that we can ignore if our function does not return a value.

How do processes pools work in multiprocessing?

Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.

What is a multiprocessing pool?

Functional Programming in Python In this lesson, you'll dive deeper into how you can use multiprocessing. Pool . It creates multiple Python processes in the background and spreads out your computations for you across multiple CPU cores so that they all happen in parallel without you needing to do anything.

How does Python multiprocessing queue work?

Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.


1 Answers

There is indeed a hint in the docs:

callback should complete immediately since otherwise the thread which handles the results will get blocked.

The callbacks are handled in the main process, but they're run in their own separate thread. When you create a Pool it actually creates a few Thread objects internally:

class Pool(object):     Process = Process      def __init__(self, processes=None, initializer=None, initargs=(),                  maxtasksperchild=None):         self._setup_queues()         self._taskqueue = Queue.Queue()         self._cache = {}         ... # stuff we don't care about         self._worker_handler = threading.Thread(             target=Pool._handle_workers,             args=(self, )             )         self._worker_handler.daemon = True         self._worker_handler._state = RUN          self._worker_handler.start()          self._task_handler = threading.Thread(             target=Pool._handle_tasks,             args=(self._taskqueue, self._quick_put, self._outqueue,                   self._pool, self._cache)             )         self._task_handler.daemon = True         self._task_handler._state = RUN          self._task_handler.start()          self._result_handler = threading.Thread(             target=Pool._handle_results,             args=(self._outqueue, self._quick_get, self._cache)             )         self._result_handler.daemon = True         self._result_handler._state = RUN         self._result_handler.start() 

The interesting thread for us is _result_handler; we'll get to why shortly.

Switching gears for a second, when you run apply_async, it creates an ApplyResult object internally to manage getting the result from the child:

def apply_async(self, func, args=(), kwds={}, callback=None):     assert self._state == RUN     result = ApplyResult(self._cache, callback)     self._taskqueue.put(([(result._job, None, func, args, kwds)], None))     return result  class ApplyResult(object):      def __init__(self, cache, callback):         self._cond = threading.Condition(threading.Lock())         self._job = job_counter.next()         self._cache = cache         self._ready = False         self._callback = callback         cache[self._job] = self       def _set(self, i, obj):         self._success, self._value = obj         if self._callback and self._success:             self._callback(self._value)         self._cond.acquire()         try:             self._ready = True             self._cond.notify()         finally:             self._cond.release()         del self._cache[self._job] 

As you can see, the _set method is the one that ends up actually executing the callback passed in, assuming the task was successful. Also notice that it adds itself to a global cache dict at the end of __init__.

Now, back to the _result_handler thread object. That object calls the _handle_results function, which looks like this:

    while 1:         try:             task = get()         except (IOError, EOFError):             debug('result handler got EOFError/IOError -- exiting')             return          if thread._state:             assert thread._state == TERMINATE             debug('result handler found thread._state=TERMINATE')             break          if task is None:             debug('result handler got sentinel')             break          job, i, obj = task         try:             cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!         except KeyError:             pass          # More stuff 

It's a loop that just pulls results from children out of queue, finds the entry for it in cache, and calls _set, which executes our callback. It's able to run even though you're in a loop because it isn't running in the main thread.

like image 191
dano Avatar answered Sep 21 '22 23:09

dano