Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python multiprocessing: some functions do not return when they are complete (queue material too big)

Tags:

I am using multiprocessing's Process and Queue. I start several functions in parallel and most behave nicely: they finish, their output goes to their Queue, and they show up as .is_alive() == False. But for some reason a couple of functions are not behaving. They always show .is_alive() == True, even after the last line in the function (a print statement saying "Finished") is complete. This happens regardless of the set of functions I launch, even it there's only one. If not run in parallel, the functions behave fine and return normally. What kind of thing might be the problem?

Here's the generic function I'm using to manage the jobs. All I'm not showing is the functions I'm passing to it. They're long, often use matplotlib, sometimes launch some shell commands, but I cannot figure out what the failing ones have in common.

def  runFunctionsInParallel(listOf_FuncAndArgLists):     """     Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.        """     from multiprocessing import Process, Queue      def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue         print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name         que.put(fff(*theArgs)) #we're putting return value into queue         print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name         # We get this far even for "bad" functions         return      queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function     jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]     for job in jobs: job.start() # Launch them all     import time     from math import sqrt     n=1     while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates         n+=1         time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.         print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------')         print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))         print('---------------------------------------------------\n')     # I never get to the following line when one of the "bad" functions is running.     for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?     # And now, collect all the outputs:     return([queue.get() for queue in queues]) 
like image 724
CPBL Avatar asked Aug 07 '12 21:08

CPBL


People also ask

How does multiprocessing queue work in Python?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.

Is multiprocessing queue slow?

In other words, Multiprocess queue is pretty slow putting and getting individual data, then QuickQueue wrap several data in one list, this list is one single data that is enqueue in the queue than is more quickly than put one individual data.

Does multiprocessing in Python use multiple cores?

A multiprocessor is a computer means that the computer has more than one central processor. If a computer has only one processor with multiple cores, the tasks can be run parallel using multithreading in Python. A multiprocessor system has the ability to support more than one processor at the same time.

Is Python multiprocessing queue thread safe?

Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.


1 Answers

Alright, it seems that the pipe used to fill the Queue gets plugged when the output of a function is too big (my crude understanding? This is an unresolved/closed bug? http://bugs.python.org/issue8237). I have modified the code in my question so that there is some buffering (queues are regularly emptied while processes are running), which solves all my problems. So now this takes a collection of tasks (functions and their arguments), launches them, and collects the outputs. I wish it were simpler /cleaner looking.

Edit (2014 Sep; update 2017 Nov: rewritten for readability): I'm updating the code with the enhancements I've made since. The new code (same function, but better features) is here: https://gitlab.com/cpbl/cpblUtilities/blob/master/parallel.py

The calling Description is also below.

def runFunctionsInParallel(*args, **kwargs):     """ This is the main/only interface to class cRunFunctionsInParallel. See its documentation for arguments.     """     return cRunFunctionsInParallel(*args, **kwargs).launch_jobs()  ########################################################################################### ### class cRunFunctionsInParallel():     ###     #######################################################################################     """Run any list of functions, each with any arguments and keyword-arguments, in parallel. The functions/jobs should return (if anything) pickleable results. In order to avoid processes getting stuck due to the output queues overflowing, the queues are regularly collected and emptied. You can now pass os.system or etc to this as the function, in order to parallelize at the OS level, with no need for a wrapper: I made use of hasattr(builtinfunction,'func_name') to check for a name. Parameters ---------- listOf_FuncAndArgLists : a list of lists      List of up-to-three-element-lists, like [function, args, kwargs],     specifying the set of functions to be launched in parallel.  If an     element is just a function, rather than a list, then it is assumed     to have no arguments or keyword arguments. Thus, possible formats     for elements of the outer list are:       function       [function, list]       [function, list, dict] kwargs: dict     One can also supply the kwargs once, for all jobs (or for those     without their own non-empty kwargs specified in the list) names: an optional list of names to identify the processes.     If omitted, the function name is used, so if all the functions are     the same (ie merely with different arguments), then they would be     named indistinguishably offsetsSeconds: int or list of ints     delay some functions' start times expectNonzeroExit: True/False     Normal behaviour is to not proceed if any function exits with a     failed exit code. This can be used to override this behaviour. parallel: True/False     Whenever the list of functions is longer than one, functions will     be run in parallel unless this parameter is passed as False maxAtOnce: int     If nonzero, this limits how many jobs will be allowed to run at     once.  By default, this is set according to how many processors     the hardware has available. showFinished : int     Specifies the maximum number of successfully finished jobs to show     in the text interface (before the last report, which should always     show them all). Returns ------- Returns a tuple of (return codes, return values), each a list in order of the jobs provided. Issues ------- Only tested on POSIX OSes. Examples -------- See the testParallel() method in this module     """ 
like image 187
CPBL Avatar answered Sep 23 '22 00:09

CPBL