Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Difference between apply() and apply_async() in Python multiprocessing module

I currently have a piece of code which spawns multiple processes as follows:

pool = Pool(processes=None)
results = [pool.apply(f, args=(arg1, arg2, arg3)) for arg3 in arg_list]

My idea was that this would divide the work across cores, using all cores available since processes=None. However, the documentation for the Pool.apply() method in the multiprocessing module docs reads:

Equivalent of the apply() built-in function. It blocks until the result is ready, so apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

First question: I don't clearly understand this. How does apply distribute the work across workers, and in what way is it different from what apply_async does? If the tasks get distributed across workers, how is it possible that func is only executed in one of the workers?

My guess: my guess would be that the apply, in my current implementation, is giving a task to a worker with a certain set of arguments, then waiting for that worker to be done, and then giving the next set of arguments to another worker. In this way I am sending work to different processes, yet no parallelism is taking place. This seems to be the case since apply is in fact just:

def apply(self, func, args=(), kwds={}):
    '''
    Equivalent of `func(*args, **kwds)`.
    Pool must be running.
    '''
    return self.apply_async(func, args, kwds).get()

Second question: I would also like to understand better why, in the introduction of the docs, section 16.6.1.5. ('Using a pool of workers'), they say that even a construction with apply_async such as [pool.apply_async(os.getpid, ()) for i in range(4)] may use more processes, but it isn't sure that it will. What decides whether multiple processes will be used?

like image 382
Pietro Marchesi Avatar asked Feb 15 '18 12:02

Pietro Marchesi


People also ask

What is apply_async Python?

apply_async is also like Python's built-in apply , except that the call returns immediately instead of waiting for the result. An AsyncResult object is returned. You call its get() method to retrieve the result of the function call. The get() method blocks until the function is completed.

What is the difference between pool and process in Python?

As we have seen, the Pool allocates only executing processes in memory and the process allocates all the tasks in memory, so when the task number is small, we can use process class and when the task number is large, we can use the pool.

What is a Daemonic process Python?

Daemon processes in Python Python multiprocessing module allows us to have daemon processes through its daemonic option. Daemon processes or the processes that are running in the background follow similar concept as the daemon threads. To execute the process in the background, we need to set the daemonic flag to true.

What is pool in multiprocessing?

The Pool class in multiprocessing can handle an enormous number of processes. It allows you to run multiple jobs per process (due to its ability to queue the jobs). The memory is allocated only to the executing processes, unlike the Process class, which allocates memory to all the processes.


1 Answers

You have pointed to a Python2.7 documentation so I'm going to base my answers on Python2.7 multiprocessing implementation. It might differ on Python3.X but should not be very different.

Difference between apply and apply_async

Difference between these two is really self describing when you view how those are actually implemented underneath. Here I'm going to copy/paste code from multiprocessing/pool.py for bot functions.

def apply(self, func, args=(), kwds={}):
    '''
    Equivalent of `apply()` builtin
    '''
    assert self._state == RUN
    return self.apply_async(func, args, kwds).get()

As you can see, apply is actually calling apply_async but just before returning result, get is called. This basically makes apply_async block until result is returned.

def apply_async(self, func, args=(), kwds={}, callback=None):
    '''
    Asynchronous equivalent of `apply()` builtin
    '''
    assert self._state == RUN
    result = ApplyResult(self._cache, callback)
    self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
    return result

apply_async enqueues task in task queue, and returns a handle of submitted task. With that handle you can call get or wait to get the results or wait for task to finish, respectively. After task is finished, what it returns gets passed as an argument to callback function.

Example:

from multiprocessing import Pool
from time import sleep


def callback(a):
    print a


def worker(i, n):
    print 'Entering worker ', i
    sleep(n)
    print 'Exiting worker'
    return 'worker_response'


if __name__ == '__main__':
    pool = Pool(4)
    a = [pool.apply_async(worker, (i, 4), callback=callback) for i in range(8)]
    for i in a:
        i.wait()

Results:

Entering worker  0
Entering worker  1
Entering worker  2
Entering worker  3
Exiting worker
Exiting worker
Exiting worker
Exiting worker
Entering worker  4
Entering worker  5
worker_response
Entering worker  6
worker_response
Entering worker  7
worker_response
worker_response
Exiting worker
Exiting worker
Exiting worker
Exiting worker
worker_response
worker_response
worker_response
worker_response

Pay attention that, when using apply_async, you have to wait for results or wait for tasks to finish. If you do not i.e. comment last 2 lines of my example, your script will immediately finish after you run it.

Why apply_async may use more processes

I understand this in regard to how apply is described and working. Since apply runs task by task sending it to available process in a Pool, apply_async adds tasks to queue and then tasks queue thread sends them to available processes in Pool. This is why more than one process may run when you use apply_async.

I went through this section couple of times to better understand the idea that author tried to convey. Lets check it here:

# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1)              # prints the PID of that process

# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]

If we try to understand last example by looking at previous one, when you have multiple successive calls of apply_async it certainly may run more of them at the same time. This probably depends on how many processes in the Pool is used at that moment. That is why they say may.

like image 169
Ilija Avatar answered Sep 21 '22 10:09

Ilija