Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best Practices for using 'multiprocessing' package in python

I am experimenting with the use of multiprocessing module in python. I have the below sample code which executes without any errors in an ipython notebook. But I see that there are additional python processes spawned in the background with each execution of the code block in the notebook.

import multiprocessing as mp

def f(x):
    print "Hello World ", mp.current_process()
    return 1

pool = mp.Pool(3)

data = range(0,10)
pool.map(f, data)

Whereas when i save the same in a normal .py file and execute, I encounter errors and have to terminate the terminal to stop the program from execution.

I have corrected this by having if __name__ == '__main__': and the creation of pool under this and also using pool.close() to close the pool.

I am curious to know what best practices should one follow when using multiprocessing and the associated functions such as map, apply, apply_async etc? I plan to use this module for reading files in parallel and hopefully apply it to few ML algorithms to speed up the process.

like image 488
ManojVenkat Avatar asked Mar 23 '14 19:03

ManojVenkat


People also ask

When would you use a multiprocessing pool?

Use the multiprocessing. Pool class when you need to execute tasks that may or may not take arguments and may or may not return a result once the tasks are complete. Use the multiprocessing. Pool class when you need to execute different types of ad hoc tasks, such as calling different target task functions.

How do you use multiprocessing in Python?

In this example, at first we import the Process class then initiate Process object with the display() function. Then process is started with start() method and then complete the process with the join() method. We can also pass arguments to the function using args keyword.

Is multiprocessing faster than multithreading?

2-Use Cases for Multiprocessing: Multiprocessing outshines threading in cases where the program is CPU intensive and doesn't have to do any IO or user interaction. Show activity on this post. Process may have multiple threads. These threads may share memory and are the units of execution within a process.

How many processes can Python multiprocessing?

However, Python will allow you to set the value to cpu_count() or even higher. Since Python will only run processes on available cores, setting max_number_processes to 20 on a 10 core machine will still mean that Python may only use 8 worker processes.


Video Answer


2 Answers

Overview, architecture & some practical tips

from my own (also limited) experience I can share the following insights on how multiprocessing works and how to use it. I didn't find the python.org manuals very descriptive or graphic, so I read the code. For everybody who had the same impression... this is what I could make up for my so far:

General good/best practice tips

  • general implementation methods:
    • test-driven with reduced datasizes: you want don't want to wonder for minutes if crashed or calculating
    • stepwise & with time profiling:
      • first, implement & debug without multiprocessing
      • next, implement & debug single-processed, profile time & compare overhead without multiple processes
      • next, increase process number & profile time to identify any GIL issues and waiting times.
  • Simple Processes or lists of of them are useful to target few function runs one-by-one funtction-2-process.
  • Pools handle the distribution of batchable workloads (highlevel tasks/commands) between a set number of Processes (pool of processes).
  • Use Pool for processor bound (high processor load with batchable inputs/outputs) and pool.ThreadPool for IO-bound (low processor load with separate input/outputs) tasks.
  • For data transfer between Processes, Pools, Threads and ThreadPools use queues.Queue and subclasses (if result order matters) or Pipes with a 1-to-1 mapping of the PipeConnections to the processes or threads.
  • Sharing variables of different types (BaseProxy, Namespaces, Queues, Pools or for setting up synchronization objects like Barrier/Lock/RLock/Sempaphore/Conditions between different processes use the Manager class.
  • In case GILs cannot be avoided, use Manager to handle them and try to separate the intense calculation processes from the GIL-related calculations (e.g. parsing in complex data structures, etc.) and connect then with Pipes or shared Queues.
  • Working with multiple Pools can be used to assign different number of processes to different tasks. Otherwise just implement one Pool with multiple mapping or apply method calls.
  • Sequential parallel computing tasks building on each others' intermediate results can be calculated with a single Pool() with multiple Pool.(star)map_async() or Pool.(star)map(). For synchronizing the tasks with each other the ApplyResult() instance returned by the mapping function with its methods ApplyResult().ready()/.wait()/.get()/.successful() is the right choice.

Architecture and process flow

  • When import multiprocessing is run the _current_process = MainProcess() is initialized which is a subclass of BaseProcess but without target, args, kwargs, _paraent_pid, basically a handle object for all other Processes in the already running python kernel which imports multiprocessing.
  • pool.ThreadPool is an analogue API to Pool which probably also shares similar architecture
  • Pool is based on 3 daemon threads Pool._task_handler, Pool._worker_handler & Pool._result_handler which connect with 1 internal queue.Queue() Pool._taskqueue and 2 internal SimpleQueues Pool._inqueue and Pool._outqueue.
  • Pool._cache is a dictionary holding the ApplyResults & subclasses instances from all Pool.appy_async()/_map_async() and submethod calls with the global ApplyResults._job from job_counter() as key.
  • ApplyResults & subclasses of a Pool are found either in Pool._cache and as return of Pool.apply_async()/._map_async() & submethods.
  • The difference between Pool.map() and Pool.map_async() is that Pool.map() == Pool.map_async().get() which forces/locks the main process to wait for all results being calculated and stored in the return object ApplyResult().
  • The Queue/SimpleQueues inPool`:
    • Pool.taskqueue: pipes the highlevel job of Pool.apply_async()/.map_async()/etc. chopped to task batches from the apply-method to the Pool._task_handler.
    • Pool._inqueue: pipes the job as batchwise ?iterator? from the Pool._task_handler to the Pool._pool.Process(target=worker, ...)
    • Pool._outqueue: pipes the results from the Pool._pool.Process(target=worker, ...) (initialized by Pool._worker_handler) to the Pool._result_handler, which again _set()s them in the ApplyResults cached in Pool._cache[self._job].
  • ApplyResult will hold the results as list if the target func have return objects. Otherwise the ApplyResult() is just the handle for the synchronization methods, i.e. the result status call methods.
  • For connecting processes and threads 4 classes are offered from high to simple functionality in the following order: queues.JoinableQueue, queues.Queue, SimpleQueue, Pipe/PipeConnection. Pipe is just a method returning 2 of the actual PipeConnection class instances.

Some code examples

import logging
import multiprocessing as mp
import random
import time
import numpy as np
from copy import deepcopy

MODEL_INPUTS = ["input_ids", "mc_token_ids", "lm_labels", "mc_labels", "token_type_ids"]

mp.log_to_stderr(level=logging.INFO)  # mp.log_to_strerr(level=logging.DEBUG)
logger = mp.get_logger()
logger.setLevel(level=logging.INFO)  # mp.setLevel(level=logging.DEBUG)


def secs2hms(seconds, num_decimals=4):
    hms_time = [*(*divmod(divmod(int(seconds), 60)[0], 60), divmod(int(seconds), 60)[1])]
    if hasattr(seconds, '__round__'):
        hms_time[-1] += seconds.__round__(num_decimals) - int(seconds)
    return hms_time


class Timer():
    def __init__(self, time_name, log_method=print, time_format='hms', hms_decimals=4):
        self.time_name = time_name
        self.output_method = get_log_method(method_name=log_method_name)
        self.time_format = time_format
        self.hms_decimals = hms_decimals
        self.start_time = time.time()

    def start(self):
        raise RuntimeError('Timer was already started at initialization.')

    def stop(self, *args):
        seconds_time = time.time() - self.start_time
        time_name = self.time_name.format(*args)
        if self.time_format == 'hms':
            hms_time = secs2hms(seconds=seconds_time, num_decimals=self.hms_decimals)
            hms_time = ' '.join([text.format(dt) for dt, text in zip(hms_time, ['{}h', '{}min', '{}sec']) if dt > 0])
            self.output_method('{} = {}'.format(time_name, hms_time))
        else:
            self.output_method('{} = {}sec'.format(time_name, seconds_time))
        self._delete_timer()

    def _delete_timer(self):
        del self


def get_log_method(method_name):
    if method_name == 'debug':
        log_method = logger.debug
    elif method_name == 'info':
        log_method = logger.info
    else:
        log_method = print
    return log_method


def _generate_random_array(shape):
    return np.array([[[random.randint(0, 1000)
                       for _ in range(shape[2])]
                      for _ in range(shape[1])]
                     for _ in range(shape[0])])


def random_piped_array(shape, pipe_in, log_method_name='print', log_name='RANDOM'):
    log_method = get_log_method(method_name=log_method_name)
    array = _generate_random_array(shape=shape)
    log_method('{}: sending `array   through `pipe_in`'.format(log_name))
    pipe_in.send(array)


def random_array(shape, log_method_name='print', log_name='RANDOM'):
    log_method = get_log_method(method_name=log_method_name)
    assert len(shape) == 3
    array = _generate_random_array(shape=shape)
    log_method('{}: append `array` to `shared_array`'.format(log_name))
    # for dataset_name in ['train', 'valid']:
    #     shared_arrays[dataset_name].append(array)
    return array


def random_shared_array(shape, shared_arrays, log_method_name='print', log_name='SHARED_RANDOM'):
    log_method = get_log_method(method_name=log_method_name)
    assert len(shape) == 3
    array = _generate_random_array(shape=shape)
    log_method('{}: append `array` to `shared_array`'.format(log_name))
    shared_arrays.append(array)


def random_nested_array(shape, nested_shared_arrays, dataset_name, log_method_name='print', log_name='NESTED_RANDOM'):
    log_method = get_log_method(method_name=log_method_name)
    log_method('{}: appending array to shared_arrays[\'{}\']'.format(log_name, dataset_name))
    assert len(shape) == 3
    array = _generate_random_array(shape=shape)
    log_method('{}: appendind `array` to `shared_array` with currently len(nested_shared_array[\'{}\']) = {}'.format(
        log_name, dataset_name, len(nested_shared_arrays[dataset_name])))
    nested_shared_arrays[dataset_name].append(array)


def nested_dict_list_deepcopy(nested_shared_arrays):
    """No hierachical switching between mp.manager.BaseProxy and unshared elements"""
    nested_unshared_arrays = dict()
    for key, shared_list in nested_shared_arrays.items():
        nested_unshared_arrays[key] = deepcopy(shared_list)
    return nested_unshared_arrays


def log_arrays_state(arrays, log_method_name='print', log_name='ARRAY_STATE'):
    log_method = get_log_method(method_name=log_method_name)
    log_method('ARRAY_STATE: type(arrays) = {}'.format(type(arrays)))
    try:
        if hasattr(arrays, '__len__'):
            log_method('{}: len(arrays) = {}'.format(log_name, len(arrays)))
            if len(arrays) < 20:
                for idx, array in enumerate(arrays):
                    log_method('{}: type(arrays[{}]) = {}'.format(log_name, idx, type(array)))
                    if hasattr(array, 'shape'):
                        log_method('{}: arrays[{}].shape = {}'.format(log_name, idx, array.shape))
                    else:
                        log_method('{}: arrays[{}] has not `shape` attribute'.format(log_name, idx))
        else:
            log_method('{}: array has no `__len__` method'.format(log_name))

    except BrokenPipeError as error_msg:
        log_method('{}: BrokenPipeError: {}'.format(log_name, error_msg))


def log_nested_arrays_state(nested_arrays, log_method_name='print', log_name='NESTED_ARRAY_STATE'):
    log_method = get_log_method(method_name=log_method_name)
    log_method('{}: type(arrays) = {}'.format(log_name, type(nested_arrays)))
    for key, arrays in nested_arrays.items():
        log_arrays_state(arrays=arrays, log_name=log_name + '_' + key.upper(), log_method_name=log_method_name)


if __name__ == '__main__':
    log_method = logger.info
    # log_method cannot be pickled in map_async, therefore an extra log_method_name string is implemented to hand
    # through
    log_method_name = 'info'
    num_samples = 100
    num_processes = 1  # len(MODEL_INPUTS)  #
    array_shapes = [(num_samples, random.randint(2, 5), random.randint(100, 300)) for _ in range(len(MODEL_INPUTS))]


    def stdout_some_newlines(num_lines=2, sleep_time=1):
        print(''.join(num_lines * ['\n']))
        time.sleep(sleep_time)

    # Pool with results from `func` with `return` received from `AsyncResult`(=`ApplyResult`)
    # `AsyncResult` also used for process synchronization, e.g. waiting for processes to finish
    log_method('MAIN: setting up `Pool.map_async` with `return`ing `func`')
    async_return_timer = Timer(time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
                               log_method=log_method)
    # Pool with variable return
    setup_pool_timer = Timer(time_name='TIMER_SETUP: time to set up pool with {} processes'.format(num_processes),
                             log_method=log_method)
    with mp.Pool(processes=num_processes) as pool:
        setup_pool_timer.stop()
        arrays = pool.starmap_async(func=random_array, iterable=[(shape, log_method_name) for shape in array_shapes])
        getted_arrays = arrays.get()
        async_return_timer.stop()
        # Logging array state inside the `pool` context manager
        log_method('MAIN: arrays from `pool.map_async() return` with in the `pool`\'s context manager:')
        log_arrays_state(arrays=arrays, log_method_name=log_method_name)
        log_method('MAIN: arrays.get() from `pool.map_async() return` with in the `pool`\'s context manager:')
        log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
    # Logging array state outside the `pool` context manager
    log_method('MAIN: arrays from `pool.map_async() return` outside the `pool`\'s context manager:')
    log_arrays_state(arrays=arrays, log_method_name=log_method_name)
    log_method('MAIN: arrays.get() from `pool.map_async() return` outside the `pool`\'s context manager:')
    log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
    del pool, arrays, getted_arrays
    stdout_some_newlines()


    # Functionality of `np.Process().is_alive()
    log_method('IS_ALIVE: testing funcktionality of flag `mp.Process().is_alive()` w.r.t. process status')
    p = mp.Process(target=lambda x: x ** 2, args=(10,))
    log_method('IS_ALIVE: after intializing, before starting: {}'.format(p.is_alive()))
    p.start()
    log_method('IS_ALIVE: after starting, before joining: p.is_alive() = {}'.format(p.is_alive()))
    time.sleep(5)
    log_method('IS_ALIVE: after sleeping 5sec, before joining: p.is_alive() = {}'.format(p.is_alive()))
    p.join()
    log_method('IS_ALIVE: after joining: p.is_alive() = {}'.format(p.is_alive()))
    p.terminate()
    del p
    stdout_some_newlines()

    # Pool with `func` `return`ing results directly to the reuslt handler from `mp.Pool().starmap_async()` of type
    # `AsyncResults()`
    log_method(
        'MAIN: Pool.map() is not tested explicitly because is equivalent to `Pool.map() == Pool.map_async().get()')
    stdout_some_newlines()


    # Pool with results assigned to shared variable & `AsyncResult` only used for process synchronization but
    # not for result receiving

    log_method(
        'MAIN: setting up Manager(), Manager.list() as shared variable and Pool.starmap_async with results from shared '
        'variable')
    async_shared_timer = Timer(
        time_name='TIMER_POOL_SHARED: time for random array with {} processes'.format(num_processes),
        log_method=log_method)
    setup_shared_variable_timer = Timer(time_name='TIMEE_INIT: time to set up shared variable', log_method=log_method)
    with mp.Manager() as sync_manager:
        shared_arrays = sync_manager.list()
        setup_shared_variable_timer.stop()
        async_return_timer = Timer(
            time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
            log_method=log_method)
        setup_pool_timer = Timer(
            time_name='TIMER_POOL_INIT: time to set up pool with {} processes'.format(num_processes),
            log_method=log_method)
        with mp.Pool(processes=num_processes) as pool:
            setup_pool_timer.stop()
            async_result = pool.starmap_async(
                func=random_shared_array,
                iterable=[(shape, shared_arrays, log_method_name) for shape in array_shapes])
            log_method('MAIN: async_result.ready() befor async.wait() = {}'.format(async_result.ready()))
            async_result.wait()
            log_method('MAIN: async_result.ready() after async.wait() = {}'.format(async_result.ready()))
            log_method('MAIN: asyn_result.sucessful() after async.wait() = {}'.format(async_result.successful()))
            async_return_timer.stop()

            copy_timer = Timer('TIMER_COPY: time to copy shared_arrays to standard arrays', log_method=log_method)
            unshared_arrays = deepcopy(shared_arrays)
            copy_timer.stop()
            async_shared_timer.stop()
            log_method('MAIN: shared_arrays from `pool.map_async()` within `sync_manager` context manager:')
            log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
            log_method(
                'MAIN: unshared_arrays = deepcopy(shared_arrays) from `pool.map_async()` within `sync_manager`\'s '
                'context manager:')
            log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)

    log_method('MAIN: shared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
    log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
    log_method('MAIN: unshared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
    log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
    del sync_manager, shared_arrays, async_result, pool, unshared_arrays
    stdout_some_newlines()

    # Same as above just with pipe instead of `shared_arrays`
    log_method('MAIN: separate process outputting to `mp.Pipe()`')
    process_pipe_timer = Timer(time_name='TIMER_PIPE: time for `random_pipe_array` outputting through a `mp.Pipe()')
    arrays = list()
    pipe_in, pipe_out = mp.Pipe()
    # initialize processes
    processes = [mp.Process(target=random_piped_array, args=(shape, pipe_in, log_method_name)) for shape in
                 array_shapes]
    # Start processes
    for process in processes:
        process.start()
    # Collect piped arrays form pipe and append them to `arrays`
    while (any([process.is_alive() for process in processes]) or pipe_out.poll()) and len(arrays) < len(MODEL_INPUTS):
        log_method(
            'RANDOM: receiving arrays through pipe and appending to arrays with currently len(arrays) = {}'.format(
                len(arrays)))
        arrays.append(pipe_out.recv())
    # join processes
    for process in processes:
        process.join()
    process_pipe_timer.stop()
    log_arrays_state(arrays=arrays, log_method_name=log_method_name)
    pipe_in.close()
    pipe_out.close()
    del arrays, pipe_in, pipe_out, processes, process
    stdout_some_newlines()

    # Nested shared dict/list/arrays
    log_method('MAIN: `random_nested_arrays` with nested shared `mp.Manager().dict()` and `mp.Manager().list()`s')
    nested_timer = Timer(time_name='TIMER_NESTED: time for `random_nested_arrays()`')
    with mp.Manager() as sync_manager:
        nested_shared_arrays = sync_manager.dict()
        nested_shared_arrays['train'] = sync_manager.list()
        nested_shared_arrays['valid'] = sync_manager.list()
        with mp.Pool(processes=num_processes) as pool:
            nested_results = pool.starmap_async(func=random_nested_array,
                                                iterable=[(shape, nested_shared_arrays, dataset_name, log_method_name)
                                                          for dataset_name in nested_shared_arrays.keys()
                                                          for shape in array_shapes])
            nested_results.wait()
            unshared_nested_arrays = nested_dict_list_deepcopy(nested_shared_arrays)
            nested_timer.stop()
    log_nested_arrays_state(nested_arrays=unshared_nested_arrays, log_method_name=log_method_name)
    del sync_manager, nested_shared_arrays, pool, nested_results, unshared_nested_arrays
    stdout_some_newlines()

    # List of processes targeted directly to their functions one by one
    log_method(
        'MAIN: separate process outputting to shared `mp.Manager.list()` with process handles maintained in list()')
    log_method('MAIN: separate process implementations are only preferred over pools for 1-to-1=processes-to-tasks'
               ' relations or asynchronous single tasks calculations.')
    processes_timer = Timer(
        time_name='TIMER_PROCESS: time for `random_shared_arrays` with separate {} processes'.format(num_processes),
        log_method=log_method)
    with mp.Manager() as sync_manager:
        shared_arrays = sync_manager.list()
        # Initialize processes
        processes = [mp.Process(target=random_shared_array, args=(shape, shared_arrays, log_method_name))
                     for shape in array_shapes]
        # Start processes
        for process in processes:
            process.start()
        processes_timer.stop()
        # Join processes = wait for processes to finish
        for process in processes:
            process.join()
        unshared_process_arrays = deepcopy(shared_arrays)
        processes_timer.stop()
    log_arrays_state(arrays=unshared_process_arrays, log_method_name=log_method_name)
    del sync_manager, shared_arrays, unshared_process_arrays, processes, process
    stdout_some_newlines()
like image 166
drstoop Avatar answered Oct 12 '22 22:10

drstoop


The reason you have to put it in if __name__ ... is because when python spawns a new process, it effectively imports this module - thus trying to run any code not in the if __name__ block again and again.

Best practice is to keep things in sensibly named, small, testable functions. Have a 'main()' function, which you then call from your if __name__ block.

Avoid global state (and module level variables). It just makes things complicated. Instead, think of passing things to and from your processes. This can be slow, so thinking first about how to send as little data as possible is useful. For instance, if you have a large config object, rather than send the whole config object to each process, split your process functions into only requiring the one or two attributes that they actually use, and just send those.

It's a lot easier to test things when it happens serially, so writing things in such a way that it's easy to make it happen sequentially rather than using map or whatever can make it easier.

It's a good idea to profile things, as the whole spawning new process can sometimes end up being slower than doing things all in one thread. The gevent module is pretty cool too - if your program is network bound, then gevent can sometimes be a lot quicker at doing things in parallel than using multiprocessing.

like image 32
Daniel Fairhead Avatar answered Oct 12 '22 22:10

Daniel Fairhead