Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding Multiprocessing: Shared Memory Management, Locks and Queues in Python

Multiprocessing is a powerful tool in python, and I want to understand it more in depth. I want to know when to use regular Locks and Queues and when to use a multiprocessing Manager to share these among all processes.

I came up with the following testing scenarios with four different conditions for multiprocessing:

  1. Using a pool and NO Manager

  2. Using a pool and a Manager

  3. Using individual processes and NO Manager

  4. Using individual processes and a Manager

The Job

All conditions execute a job function the_job. the_job consists of some printing which is secured by a lock. Moreover, the input to the function is simply put into a queue (to see if it can be recovered from the queue). This input is simply an index idx from range(10) created in the main script called start_scenario (shown at the bottom).

def the_job(args):     """The job for multiprocessing.      Prints some stuff secured by a lock and      finally puts the input into a queue.      """     idx = args[0]     lock = args[1]     queue=args[2]      lock.acquire()     print 'I'     print 'was '     print 'here '     print '!!!!'     print '1111'     print 'einhundertelfzigelf\n'     who= ' By run %d \n' % idx     print who     lock.release()      queue.put(idx) 

The success of a condition is defined as perfectly recalling the input from the queue, see the function read_queue at the bottom.

The Conditions

Condition 1 and 2 are rather self-explanatory. Condition 1 involves creating a lock and a queue, and passing these to a process pool:

def scenario_1_pool_no_manager(jobfunc, args, ncores):     """Runs a pool of processes WITHOUT a Manager for the lock and queue.      FAILS!      """     mypool = mp.Pool(ncores)     lock = mp.Lock()     queue = mp.Queue()      iterator = make_iterator(args, lock, queue)      mypool.imap(jobfunc, iterator)      mypool.close()     mypool.join()      return read_queue(queue) 

(The helper function make_iterator is given at the bottom of this post.) Conditions 1 fails with RuntimeError: Lock objects should only be shared between processes through inheritance.

Condition 2 is rather similar but now the lock and queue are under the supervision of a manager:

def scenario_2_pool_manager(jobfunc, args, ncores):     """Runs a pool of processes WITH a Manager for the lock and queue.      SUCCESSFUL!      """     mypool = mp.Pool(ncores)     lock = mp.Manager().Lock()     queue = mp.Manager().Queue()      iterator = make_iterator(args, lock, queue)     mypool.imap(jobfunc, iterator)     mypool.close()     mypool.join()      return read_queue(queue) 

In condition 3 new processes are started manually, and the lock and queue are created without a manager:

def scenario_3_single_processes_no_manager(jobfunc, args, ncores):     """Runs an individual process for every task WITHOUT a Manager,      SUCCESSFUL!      """     lock = mp.Lock()     queue = mp.Queue()      iterator = make_iterator(args, lock, queue)      do_job_single_processes(jobfunc, iterator, ncores)      return read_queue(queue) 

Condition 4 is similar but again now using a manager:

def scenario_4_single_processes_manager(jobfunc, args, ncores):     """Runs an individual process for every task WITH a Manager,      SUCCESSFUL!      """     lock = mp.Manager().Lock()     queue = mp.Manager().Queue()      iterator = make_iterator(args, lock, queue)      do_job_single_processes(jobfunc, iterator, ncores)      return read_queue(queue) 

In both conditions - 3 and 4 - I start a new process for each of the 10 tasks of the_job with at most ncores processes operating at the very same time. This is achieved with the following helper function:

def do_job_single_processes(jobfunc, iterator, ncores):     """Runs a job function by starting individual processes for every task.      At most `ncores` processes operate at the same time      :param jobfunc: Job to do      :param iterator:          Iterator over different parameter settings,         contains a lock and a queue      :param ncores:          Number of processes operating at the same time      """     keep_running=True     process_dict = {} # Dict containing all subprocees      while len(process_dict)>0 or keep_running:          terminated_procs_pids = []         # First check if some processes did finish their job         for pid, proc in process_dict.iteritems():              # Remember the terminated processes             if not proc.is_alive():                 terminated_procs_pids.append(pid)          # And delete these from the process dict         for terminated_proc in terminated_procs_pids:             process_dict.pop(terminated_proc)          # If we have less active processes than ncores and there is still         # a job to do, add another process         if len(process_dict) < ncores and keep_running:             try:                 task = iterator.next()                 proc = mp.Process(target=jobfunc,                                                    args=(task,))                 proc.start()                 process_dict[proc.pid]=proc             except StopIteration:                 # All tasks have been started                 keep_running=False          time.sleep(0.1) 

The Outcome

Only condition 1 fails (RuntimeError: Lock objects should only be shared between processes through inheritance) whereas the other 3 conditions are successful. I try to wrap my head around this outcome.

Why does the pool need to share a lock and queue between all processes but the individual processes from condition 3 don't?

What I know is that for the pool conditions (1 and 2) all data from the iterators is passed via pickling, whereas in single process conditions (3 and 4) all data from the iterators is passed by inheritance from the main process (I am using Linux). I guess until the memory is changed from within a child process, the same memory that the parental process uses is accessed (copy-on-write). But as soon as one says lock.acquire(), this should be changed and the child processes do use different locks placed somewhere else in memory, don't they? How does one child process know that a brother has activated a lock that is not shared via a manager?

Finally, somewhat related is my question how much different conditions 3 and 4 are. Both having individual processes but they differ in the usage of a manager. Are both considered to be valid code? Or should one avoid using a manager if there is actually no need for one?


Full Script

For those who simply want to copy and paste everything to execute the code, here is the full script:

__author__ = 'Me and myself'  import multiprocessing as mp import time  def the_job(args):     """The job for multiprocessing.      Prints some stuff secured by a lock and      finally puts the input into a queue.      """     idx = args[0]     lock = args[1]     queue=args[2]      lock.acquire()     print 'I'     print 'was '     print 'here '     print '!!!!'     print '1111'     print 'einhundertelfzigelf\n'     who= ' By run %d \n' % idx     print who     lock.release()      queue.put(idx)   def read_queue(queue):     """Turns a qeue into a normal python list."""     results = []     while not queue.empty():         result = queue.get()         results.append(result)     return results   def make_iterator(args, lock, queue):     """Makes an iterator over args and passes the lock an queue to each element."""     return ((arg, lock, queue) for arg in args)   def start_scenario(scenario_number = 1):     """Starts one of four multiprocessing scenarios.      :param scenario_number: Index of scenario, 1 to 4      """     args = range(10)     ncores = 3     if scenario_number==1:         result =  scenario_1_pool_no_manager(the_job, args, ncores)      elif scenario_number==2:         result =  scenario_2_pool_manager(the_job, args, ncores)      elif scenario_number==3:         result =  scenario_3_single_processes_no_manager(the_job, args, ncores)      elif scenario_number==4:         result =  scenario_4_single_processes_manager(the_job, args, ncores)      if result != args:         print 'Scenario %d fails: %s != %s' % (scenario_number, args, result)     else:         print 'Scenario %d successful!' % scenario_number   def scenario_1_pool_no_manager(jobfunc, args, ncores):     """Runs a pool of processes WITHOUT a Manager for the lock and queue.      FAILS!      """     mypool = mp.Pool(ncores)     lock = mp.Lock()     queue = mp.Queue()      iterator = make_iterator(args, lock, queue)      mypool.map(jobfunc, iterator)      mypool.close()     mypool.join()      return read_queue(queue)   def scenario_2_pool_manager(jobfunc, args, ncores):     """Runs a pool of processes WITH a Manager for the lock and queue.      SUCCESSFUL!      """     mypool = mp.Pool(ncores)     lock = mp.Manager().Lock()     queue = mp.Manager().Queue()      iterator = make_iterator(args, lock, queue)     mypool.map(jobfunc, iterator)     mypool.close()     mypool.join()      return read_queue(queue)   def scenario_3_single_processes_no_manager(jobfunc, args, ncores):     """Runs an individual process for every task WITHOUT a Manager,      SUCCESSFUL!      """     lock = mp.Lock()     queue = mp.Queue()      iterator = make_iterator(args, lock, queue)      do_job_single_processes(jobfunc, iterator, ncores)      return read_queue(queue)   def scenario_4_single_processes_manager(jobfunc, args, ncores):     """Runs an individual process for every task WITH a Manager,      SUCCESSFUL!      """     lock = mp.Manager().Lock()     queue = mp.Manager().Queue()      iterator = make_iterator(args, lock, queue)      do_job_single_processes(jobfunc, iterator, ncores)      return read_queue(queue)   def do_job_single_processes(jobfunc, iterator, ncores):     """Runs a job function by starting individual processes for every task.      At most `ncores` processes operate at the same time      :param jobfunc: Job to do      :param iterator:          Iterator over different parameter settings,         contains a lock and a queue      :param ncores:          Number of processes operating at the same time      """     keep_running=True     process_dict = {} # Dict containing all subprocees      while len(process_dict)>0 or keep_running:          terminated_procs_pids = []         # First check if some processes did finish their job         for pid, proc in process_dict.iteritems():              # Remember the terminated processes             if not proc.is_alive():                 terminated_procs_pids.append(pid)          # And delete these from the process dict         for terminated_proc in terminated_procs_pids:             process_dict.pop(terminated_proc)          # If we have less active processes than ncores and there is still         # a job to do, add another process         if len(process_dict) < ncores and keep_running:             try:                 task = iterator.next()                 proc = mp.Process(target=jobfunc,                                                    args=(task,))                 proc.start()                 process_dict[proc.pid]=proc             except StopIteration:                 # All tasks have been started                 keep_running=False          time.sleep(0.1)   def main():     """Runs 1 out of 4 different multiprocessing scenarios"""     start_scenario(1)   if __name__ == '__main__':     main() 
like image 572
SmCaterpillar Avatar asked Dec 23 '13 11:12

SmCaterpillar


People also ask

What is shared memory in Python multiprocessing?

Shared memory : multiprocessing module provides Array and Value objects to share data between processes. Array: a ctypes array allocated from shared memory. Value: a ctypes object allocated from shared memory.

How does multiprocessing lock work in Python?

Python provides a mutual exclusion lock for use with processes via the multiprocessing. Lock class. An instance of the lock can be created and then acquired by processes before accessing a critical section, and released after the critical section. Only one process can have the lock at any time.

How does multiprocessing queue work in Python?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.


1 Answers

multiprocessing.Lock is implemented using a Semaphore object provided by the OS. On Linux, the child just inherits a handle to the Semaphore from the parent via os.fork. This isn't a copy of the semaphore; it's actually inheriting the same handle the parent has, the same way file descriptors can be inherited. Windows on the other hand, doesn't support os.fork, so it has to pickle the Lock. It does this by creating a duplicate handle to the Windows Semaphore used internally by the multiprocessing.Lock object, using the Windows DuplicateHandle API, which states:

The duplicate handle refers to the same object as the original handle. Therefore, any changes to the object are reflected through both handles

The DuplicateHandle API allows you to give ownership of the duplicated handle to the child process, so that the child process can actually use it after unpickling it. By creating a duplicated handle owned by the child, you can effectively "share" the lock object.

Here's the semaphore object in multiprocessing/synchronize.py

class SemLock(object):      def __init__(self, kind, value, maxvalue):         sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)         debug('created semlock with handle %s' % sl.handle)         self._make_methods()          if sys.platform != 'win32':             def _after_fork(obj):                 obj._semlock._after_fork()             register_after_fork(self, _after_fork)      def _make_methods(self):         self.acquire = self._semlock.acquire         self.release = self._semlock.release         self.__enter__ = self._semlock.__enter__         self.__exit__ = self._semlock.__exit__      def __getstate__(self):  # This is called when you try to pickle the `Lock`.         assert_spawning(self)         sl = self._semlock         return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)      def __setstate__(self, state): # This is called when unpickling a `Lock`         self._semlock = _multiprocessing.SemLock._rebuild(*state)         debug('recreated blocker with handle %r' % state[0])         self._make_methods() 

Note the assert_spawning call in __getstate__, which gets called when pickling the object. Here's how that is implemented:

# # Check that the current thread is spawning a child process #  def assert_spawning(self):     if not Popen.thread_is_spawning():         raise RuntimeError(             '%s objects should only be shared between processes'             ' through inheritance' % type(self).__name__             ) 

That function is the one that makes sure you're "inheriting" the Lock, by calling thread_is_spawning. On Linux, that method just returns False:

@staticmethod def thread_is_spawning():     return False 

This is because Linux doesn't need to pickle to inherit Lock, so if __getstate__ is actually being called on Linux, we must not be inheriting. On Windows, there's more going on:

def dump(obj, file, protocol=None):     ForkingPickler(file, protocol).dump(obj)  class Popen(object):     '''     Start a subprocess to run the code of a process object     '''     _tls = thread._local()      def __init__(self, process_obj):         ...         # send information to child         prep_data = get_preparation_data(process_obj._name)         to_child = os.fdopen(wfd, 'wb')         Popen._tls.process_handle = int(hp)         try:             dump(prep_data, to_child, HIGHEST_PROTOCOL)             dump(process_obj, to_child, HIGHEST_PROTOCOL)         finally:             del Popen._tls.process_handle             to_child.close()       @staticmethod     def thread_is_spawning():         return getattr(Popen._tls, 'process_handle', None) is not None 

Here, thread_is_spawning returns True if the Popen._tls object has a process_handle attribute. We can see that the process_handle attribute gets created in __init__, then the data we want inherited is passed from the parent to child using dump, then the attribute is deleted. So thread_is_spawning will only be True during __init__. According to this python-ideas mailing list thread, this is actually an artificial limitation added to simulate the same behavior as os.fork on Linux. Windows actually could support passing the Lock at any time, because DuplicateHandle can be run at any time.

All of the above applies to the Queue object because it uses Lock internally.

I would say that inheriting Lock objects is preferable to using a Manager.Lock(), because when you use a Manager.Lock, every single call you make to the Lock must be sent via IPC to the Manager process, which is going to be much slower than using a shared Lock that lives inside the calling process. Both approaches are perfectly valid, though.

Finally, it is possible to pass a Lock to all members of a Pool without using a Manager, using the initializer/initargs keyword arguments:

lock = None def initialize_lock(l):    global lock    lock = l  def scenario_1_pool_no_manager(jobfunc, args, ncores):     """Runs a pool of processes WITHOUT a Manager for the lock and queue.      """     lock = mp.Lock()     mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))     queue = mp.Queue()      iterator = make_iterator(args, queue)      mypool.imap(jobfunc, iterator) # Don't pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.      mypool.close()     mypool.join()  return read_queue(queue) 

This works because arguments passed to initargs get passed to the __init__ method of the Process objects that run inside the Pool, so they end up being inherited, rather than pickled.

like image 88
dano Avatar answered Oct 10 '22 08:10

dano