I am experimenting with the use of multiprocessing
module in python
. I have the below sample code which executes without any errors in an ipython notebook. But I see that there are additional python processes spawned in the background with each execution of the code block in the notebook.
import multiprocessing as mp
def f(x):
print "Hello World ", mp.current_process()
return 1
pool = mp.Pool(3)
data = range(0,10)
pool.map(f, data)
Whereas when i save the same in a normal .py file and execute, I encounter errors and have to terminate the terminal to stop the program from execution.
I have corrected this by having if __name__ == '__main__':
and the creation of pool under this and also using pool.close()
to close the pool.
I am curious to know what best practices should one follow when using multiprocessing
and the associated functions such as map
, apply
, apply_async
etc? I plan to use this module for reading files in parallel and hopefully apply it to few ML algorithms to speed up the process.
Use the multiprocessing. Pool class when you need to execute tasks that may or may not take arguments and may or may not return a result once the tasks are complete. Use the multiprocessing. Pool class when you need to execute different types of ad hoc tasks, such as calling different target task functions.
In this example, at first we import the Process class then initiate Process object with the display() function. Then process is started with start() method and then complete the process with the join() method. We can also pass arguments to the function using args keyword.
2-Use Cases for Multiprocessing: Multiprocessing outshines threading in cases where the program is CPU intensive and doesn't have to do any IO or user interaction. Show activity on this post. Process may have multiple threads. These threads may share memory and are the units of execution within a process.
However, Python will allow you to set the value to cpu_count() or even higher. Since Python will only run processes on available cores, setting max_number_processes to 20 on a 10 core machine will still mean that Python may only use 8 worker processes.
from my own (also limited) experience I can share the following insights on how multiprocessing works and how to use it. I didn't find the python.org manuals very descriptive or graphic, so I read the code. For everybody who had the same impression... this is what I could make up for my so far:
Process
es or lists of of them are useful to target few function runs one-by-one funtction-2-process.Pool
s handle the distribution of batchable workloads (highlevel tasks/commands) between a set number of Process
es (pool of processes).Pool
for processor bound (high processor load with batchable inputs/outputs) and pool.ThreadPool
for IO-bound (low processor load with separate input/outputs) tasks.Process
es, Pool
s, Thread
s and ThreadPool
s use queues.Queue
and subclasses (if result order matters) or Pipe
s with a 1-to-1 mapping of the PipeConnection
s to the processes or threads.BaseProxy
, Namespace
s, Queue
s, Pool
s or for setting up synchronization objects like Barrier
/Lock
/RLock
/Sempaphore
/Condition
s between different processes use the Manager
class.GIL
s cannot be avoided, use Manager
to handle them and try to separate the intense calculation processes from the GIL
-related calculations (e.g. parsing in complex data structures, etc.) and connect then with Pipe
s or shared Queue
s.Pool
s can be used to assign different number of processes to different tasks. Otherwise just implement one Pool
with multiple mapping or apply method calls.Pool()
with multiple Pool.(star)map_async()
or Pool.(star)map()
. For synchronizing the tasks with each other the ApplyResult()
instance returned by the mapping function with its methods ApplyResult().ready()/.wait()/.get()/.successful()
is the right choice.import multiprocessing
is run the _current_process = MainProcess()
is initialized which is a subclass of BaseProcess
but without target
, args
, kwargs
, _paraent_pid
, basically a handle object for all other Process
es in the already running python kernel which imports multiprocessing
.pool.ThreadPool
is an analogue API to Pool which probably also shares similar architecturePool
is based on 3 daemon threads Pool._task_handler
, Pool._worker_handler
& Pool._result_handler
which connect with 1 internal queue.Queue()
Pool._taskqueue
and 2 internal SimpleQueue
s Pool._inqueue
and Pool._outqueue
.Pool._cache
is a dictionary holding the ApplyResults
& subclasses instances from all Pool.appy_async()/_map_async()
and submethod calls with the global ApplyResults._job
from job_counter()
as key
.ApplyResult
s & subclasses of a Pool
are found either in Pool._cache
and as return of Pool.apply_async()/._map_async()
& submethods.Pool.map()
and Pool.map_async()
is that Pool.map() == Pool.map_async().get()
which forces/locks the main process to wait for all results being calculated and stored in the return object ApplyResult()
.Queue
/SimpleQueues in
Pool`:
Pool.taskqueue
: pipes the highlevel job of Pool.apply_async()/.map_async()
/etc. chopped to task batches from the apply-method to the Pool._task_handler
.Pool._inqueue
: pipes the job as batchwise ?iterator? from the Pool._task_handler
to the Pool._pool.Process(target=worker, ...)
Pool._outqueue
: pipes the results from the Pool._pool.Process(target=worker, ...)
(initialized by Pool._worker_handler
) to the Pool._result_handler
, which again _set()
s them in the ApplyResult
s cached in Pool._cache[self._job]
.ApplyResult
will hold the results as list if the target func
have return objects. Otherwise the ApplyResult()
is just the handle for the synchronization methods, i.e. the result status call methods.queues.JoinableQueue
, queues.Queue
, SimpleQueue
, Pipe
/PipeConnection
.
Pipe
is just a method returning 2 of the actual PipeConnection
class instances.import logging
import multiprocessing as mp
import random
import time
import numpy as np
from copy import deepcopy
MODEL_INPUTS = ["input_ids", "mc_token_ids", "lm_labels", "mc_labels", "token_type_ids"]
mp.log_to_stderr(level=logging.INFO) # mp.log_to_strerr(level=logging.DEBUG)
logger = mp.get_logger()
logger.setLevel(level=logging.INFO) # mp.setLevel(level=logging.DEBUG)
def secs2hms(seconds, num_decimals=4):
hms_time = [*(*divmod(divmod(int(seconds), 60)[0], 60), divmod(int(seconds), 60)[1])]
if hasattr(seconds, '__round__'):
hms_time[-1] += seconds.__round__(num_decimals) - int(seconds)
return hms_time
class Timer():
def __init__(self, time_name, log_method=print, time_format='hms', hms_decimals=4):
self.time_name = time_name
self.output_method = get_log_method(method_name=log_method_name)
self.time_format = time_format
self.hms_decimals = hms_decimals
self.start_time = time.time()
def start(self):
raise RuntimeError('Timer was already started at initialization.')
def stop(self, *args):
seconds_time = time.time() - self.start_time
time_name = self.time_name.format(*args)
if self.time_format == 'hms':
hms_time = secs2hms(seconds=seconds_time, num_decimals=self.hms_decimals)
hms_time = ' '.join([text.format(dt) for dt, text in zip(hms_time, ['{}h', '{}min', '{}sec']) if dt > 0])
self.output_method('{} = {}'.format(time_name, hms_time))
else:
self.output_method('{} = {}sec'.format(time_name, seconds_time))
self._delete_timer()
def _delete_timer(self):
del self
def get_log_method(method_name):
if method_name == 'debug':
log_method = logger.debug
elif method_name == 'info':
log_method = logger.info
else:
log_method = print
return log_method
def _generate_random_array(shape):
return np.array([[[random.randint(0, 1000)
for _ in range(shape[2])]
for _ in range(shape[1])]
for _ in range(shape[0])])
def random_piped_array(shape, pipe_in, log_method_name='print', log_name='RANDOM'):
log_method = get_log_method(method_name=log_method_name)
array = _generate_random_array(shape=shape)
log_method('{}: sending `array through `pipe_in`'.format(log_name))
pipe_in.send(array)
def random_array(shape, log_method_name='print', log_name='RANDOM'):
log_method = get_log_method(method_name=log_method_name)
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: append `array` to `shared_array`'.format(log_name))
# for dataset_name in ['train', 'valid']:
# shared_arrays[dataset_name].append(array)
return array
def random_shared_array(shape, shared_arrays, log_method_name='print', log_name='SHARED_RANDOM'):
log_method = get_log_method(method_name=log_method_name)
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: append `array` to `shared_array`'.format(log_name))
shared_arrays.append(array)
def random_nested_array(shape, nested_shared_arrays, dataset_name, log_method_name='print', log_name='NESTED_RANDOM'):
log_method = get_log_method(method_name=log_method_name)
log_method('{}: appending array to shared_arrays[\'{}\']'.format(log_name, dataset_name))
assert len(shape) == 3
array = _generate_random_array(shape=shape)
log_method('{}: appendind `array` to `shared_array` with currently len(nested_shared_array[\'{}\']) = {}'.format(
log_name, dataset_name, len(nested_shared_arrays[dataset_name])))
nested_shared_arrays[dataset_name].append(array)
def nested_dict_list_deepcopy(nested_shared_arrays):
"""No hierachical switching between mp.manager.BaseProxy and unshared elements"""
nested_unshared_arrays = dict()
for key, shared_list in nested_shared_arrays.items():
nested_unshared_arrays[key] = deepcopy(shared_list)
return nested_unshared_arrays
def log_arrays_state(arrays, log_method_name='print', log_name='ARRAY_STATE'):
log_method = get_log_method(method_name=log_method_name)
log_method('ARRAY_STATE: type(arrays) = {}'.format(type(arrays)))
try:
if hasattr(arrays, '__len__'):
log_method('{}: len(arrays) = {}'.format(log_name, len(arrays)))
if len(arrays) < 20:
for idx, array in enumerate(arrays):
log_method('{}: type(arrays[{}]) = {}'.format(log_name, idx, type(array)))
if hasattr(array, 'shape'):
log_method('{}: arrays[{}].shape = {}'.format(log_name, idx, array.shape))
else:
log_method('{}: arrays[{}] has not `shape` attribute'.format(log_name, idx))
else:
log_method('{}: array has no `__len__` method'.format(log_name))
except BrokenPipeError as error_msg:
log_method('{}: BrokenPipeError: {}'.format(log_name, error_msg))
def log_nested_arrays_state(nested_arrays, log_method_name='print', log_name='NESTED_ARRAY_STATE'):
log_method = get_log_method(method_name=log_method_name)
log_method('{}: type(arrays) = {}'.format(log_name, type(nested_arrays)))
for key, arrays in nested_arrays.items():
log_arrays_state(arrays=arrays, log_name=log_name + '_' + key.upper(), log_method_name=log_method_name)
if __name__ == '__main__':
log_method = logger.info
# log_method cannot be pickled in map_async, therefore an extra log_method_name string is implemented to hand
# through
log_method_name = 'info'
num_samples = 100
num_processes = 1 # len(MODEL_INPUTS) #
array_shapes = [(num_samples, random.randint(2, 5), random.randint(100, 300)) for _ in range(len(MODEL_INPUTS))]
def stdout_some_newlines(num_lines=2, sleep_time=1):
print(''.join(num_lines * ['\n']))
time.sleep(sleep_time)
# Pool with results from `func` with `return` received from `AsyncResult`(=`ApplyResult`)
# `AsyncResult` also used for process synchronization, e.g. waiting for processes to finish
log_method('MAIN: setting up `Pool.map_async` with `return`ing `func`')
async_return_timer = Timer(time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
log_method=log_method)
# Pool with variable return
setup_pool_timer = Timer(time_name='TIMER_SETUP: time to set up pool with {} processes'.format(num_processes),
log_method=log_method)
with mp.Pool(processes=num_processes) as pool:
setup_pool_timer.stop()
arrays = pool.starmap_async(func=random_array, iterable=[(shape, log_method_name) for shape in array_shapes])
getted_arrays = arrays.get()
async_return_timer.stop()
# Logging array state inside the `pool` context manager
log_method('MAIN: arrays from `pool.map_async() return` with in the `pool`\'s context manager:')
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
log_method('MAIN: arrays.get() from `pool.map_async() return` with in the `pool`\'s context manager:')
log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
# Logging array state outside the `pool` context manager
log_method('MAIN: arrays from `pool.map_async() return` outside the `pool`\'s context manager:')
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
log_method('MAIN: arrays.get() from `pool.map_async() return` outside the `pool`\'s context manager:')
log_arrays_state(arrays=getted_arrays, log_method_name=log_method_name)
del pool, arrays, getted_arrays
stdout_some_newlines()
# Functionality of `np.Process().is_alive()
log_method('IS_ALIVE: testing funcktionality of flag `mp.Process().is_alive()` w.r.t. process status')
p = mp.Process(target=lambda x: x ** 2, args=(10,))
log_method('IS_ALIVE: after intializing, before starting: {}'.format(p.is_alive()))
p.start()
log_method('IS_ALIVE: after starting, before joining: p.is_alive() = {}'.format(p.is_alive()))
time.sleep(5)
log_method('IS_ALIVE: after sleeping 5sec, before joining: p.is_alive() = {}'.format(p.is_alive()))
p.join()
log_method('IS_ALIVE: after joining: p.is_alive() = {}'.format(p.is_alive()))
p.terminate()
del p
stdout_some_newlines()
# Pool with `func` `return`ing results directly to the reuslt handler from `mp.Pool().starmap_async()` of type
# `AsyncResults()`
log_method(
'MAIN: Pool.map() is not tested explicitly because is equivalent to `Pool.map() == Pool.map_async().get()')
stdout_some_newlines()
# Pool with results assigned to shared variable & `AsyncResult` only used for process synchronization but
# not for result receiving
log_method(
'MAIN: setting up Manager(), Manager.list() as shared variable and Pool.starmap_async with results from shared '
'variable')
async_shared_timer = Timer(
time_name='TIMER_POOL_SHARED: time for random array with {} processes'.format(num_processes),
log_method=log_method)
setup_shared_variable_timer = Timer(time_name='TIMEE_INIT: time to set up shared variable', log_method=log_method)
with mp.Manager() as sync_manager:
shared_arrays = sync_manager.list()
setup_shared_variable_timer.stop()
async_return_timer = Timer(
time_name='TIMER_POOL: time for random array with {} processes'.format(num_processes),
log_method=log_method)
setup_pool_timer = Timer(
time_name='TIMER_POOL_INIT: time to set up pool with {} processes'.format(num_processes),
log_method=log_method)
with mp.Pool(processes=num_processes) as pool:
setup_pool_timer.stop()
async_result = pool.starmap_async(
func=random_shared_array,
iterable=[(shape, shared_arrays, log_method_name) for shape in array_shapes])
log_method('MAIN: async_result.ready() befor async.wait() = {}'.format(async_result.ready()))
async_result.wait()
log_method('MAIN: async_result.ready() after async.wait() = {}'.format(async_result.ready()))
log_method('MAIN: asyn_result.sucessful() after async.wait() = {}'.format(async_result.successful()))
async_return_timer.stop()
copy_timer = Timer('TIMER_COPY: time to copy shared_arrays to standard arrays', log_method=log_method)
unshared_arrays = deepcopy(shared_arrays)
copy_timer.stop()
async_shared_timer.stop()
log_method('MAIN: shared_arrays from `pool.map_async()` within `sync_manager` context manager:')
log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
log_method(
'MAIN: unshared_arrays = deepcopy(shared_arrays) from `pool.map_async()` within `sync_manager`\'s '
'context manager:')
log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
log_method('MAIN: shared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
log_arrays_state(arrays=shared_arrays, log_method_name=log_method_name)
log_method('MAIN: unshared_arrays from `pool.map_async()` outside `sync_manager`\'s context manager:')
log_arrays_state(arrays=unshared_arrays, log_method_name=log_method_name)
del sync_manager, shared_arrays, async_result, pool, unshared_arrays
stdout_some_newlines()
# Same as above just with pipe instead of `shared_arrays`
log_method('MAIN: separate process outputting to `mp.Pipe()`')
process_pipe_timer = Timer(time_name='TIMER_PIPE: time for `random_pipe_array` outputting through a `mp.Pipe()')
arrays = list()
pipe_in, pipe_out = mp.Pipe()
# initialize processes
processes = [mp.Process(target=random_piped_array, args=(shape, pipe_in, log_method_name)) for shape in
array_shapes]
# Start processes
for process in processes:
process.start()
# Collect piped arrays form pipe and append them to `arrays`
while (any([process.is_alive() for process in processes]) or pipe_out.poll()) and len(arrays) < len(MODEL_INPUTS):
log_method(
'RANDOM: receiving arrays through pipe and appending to arrays with currently len(arrays) = {}'.format(
len(arrays)))
arrays.append(pipe_out.recv())
# join processes
for process in processes:
process.join()
process_pipe_timer.stop()
log_arrays_state(arrays=arrays, log_method_name=log_method_name)
pipe_in.close()
pipe_out.close()
del arrays, pipe_in, pipe_out, processes, process
stdout_some_newlines()
# Nested shared dict/list/arrays
log_method('MAIN: `random_nested_arrays` with nested shared `mp.Manager().dict()` and `mp.Manager().list()`s')
nested_timer = Timer(time_name='TIMER_NESTED: time for `random_nested_arrays()`')
with mp.Manager() as sync_manager:
nested_shared_arrays = sync_manager.dict()
nested_shared_arrays['train'] = sync_manager.list()
nested_shared_arrays['valid'] = sync_manager.list()
with mp.Pool(processes=num_processes) as pool:
nested_results = pool.starmap_async(func=random_nested_array,
iterable=[(shape, nested_shared_arrays, dataset_name, log_method_name)
for dataset_name in nested_shared_arrays.keys()
for shape in array_shapes])
nested_results.wait()
unshared_nested_arrays = nested_dict_list_deepcopy(nested_shared_arrays)
nested_timer.stop()
log_nested_arrays_state(nested_arrays=unshared_nested_arrays, log_method_name=log_method_name)
del sync_manager, nested_shared_arrays, pool, nested_results, unshared_nested_arrays
stdout_some_newlines()
# List of processes targeted directly to their functions one by one
log_method(
'MAIN: separate process outputting to shared `mp.Manager.list()` with process handles maintained in list()')
log_method('MAIN: separate process implementations are only preferred over pools for 1-to-1=processes-to-tasks'
' relations or asynchronous single tasks calculations.')
processes_timer = Timer(
time_name='TIMER_PROCESS: time for `random_shared_arrays` with separate {} processes'.format(num_processes),
log_method=log_method)
with mp.Manager() as sync_manager:
shared_arrays = sync_manager.list()
# Initialize processes
processes = [mp.Process(target=random_shared_array, args=(shape, shared_arrays, log_method_name))
for shape in array_shapes]
# Start processes
for process in processes:
process.start()
processes_timer.stop()
# Join processes = wait for processes to finish
for process in processes:
process.join()
unshared_process_arrays = deepcopy(shared_arrays)
processes_timer.stop()
log_arrays_state(arrays=unshared_process_arrays, log_method_name=log_method_name)
del sync_manager, shared_arrays, unshared_process_arrays, processes, process
stdout_some_newlines()
The reason you have to put it in if __name__
... is because when python spawns a new process, it effectively imports this module - thus trying to run any code not in the if __name__
block again and again.
Best practice is to keep things in sensibly named, small, testable functions. Have a 'main()' function, which you then call from your if __name__
block.
Avoid global state (and module level variables). It just makes things complicated. Instead, think of passing things to and from your processes. This can be slow, so thinking first about how to send as little data as possible is useful. For instance, if you have a large config object, rather than send the whole config object to each process, split your process functions into only requiring the one or two attributes that they actually use, and just send those.
It's a lot easier to test things when it happens serially, so writing things in such a way that it's easy to make it happen sequentially rather than using map
or whatever can make it easier.
It's a good idea to profile things, as the whole spawning new process can sometimes end up being slower than doing things all in one thread. The gevent module is pretty cool too - if your program is network bound, then gevent can sometimes be a lot quicker at doing things in parallel than using multiprocessing.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With