Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Multiprocessing: Only one process is running

I am trying to spawn multiple parallel processes using the Python multiprocessing module. Basically, I did something like

pool = Pool(30)
results = [pool.apply_async(foo, (trainData, featureVector, terms, selLabel)) for selLabel in selLabels]
for r in results:
    tmp = r.get()
    modelFiles[tmp[0]] = tmp[1]

30 processes are spawned, however, it seems most of the processes have been put into sleep while only one process is actually running. Below is what I get from ps:

PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND

31267 74.6  2.4 7125412 6360080 pts/1 Sl+  13:06  24:25  \_ python2.6 /home/PerlModules/Python/DoOVA.py

31427 27.4  2.3 6528532 6120904 pts/1 R+   13:20   5:18      \_ python2.6 /home/PerlModules/Python/DoOVA.py

31428  0.0  1.3 4024724 3617016 pts/1 S+   13:20   0:00      \_ python2.6 /home/PerlModules/Python/DoOVA.py

31429  0.0  1.3 4024724 3617016 pts/1 S+   13:20   0:00      \_ python2.6 /home/PerlModules/Python/DoOVA.py

31430  0.0  1.3 4024724 3617016 pts/1 S+   13:20   0:00      \_ python2.6 /home/PerlModules/Python/DoOVA.py

DoOVA.py is the script I am running. Most of them have a status S+.

Could anyone give me some clue about what's the problem? I know the input arguement featureVector is pretty large in size, say around 300MB. Would that be a problem? The machine I am running on have several TB of memory though.

foo does something like:

def foo(trainData, featureVector, terms, selLabel, penalty):
    outputFile = 'train_'+selLabel+'.dat'
    annotation = dict()
    for id in trainData:
        if trainData[id] == selLabel:
            annotation[id] = '1'
        else:
            annotation[id] = '-1'
    try:
        os.mkdir(selLabel)
        os.chdir(selLabel)
    except OSError:
        os.chdir(selLabel)
    ###Some more functions, which involves a command line call through call from subprocess module
    os.chdir('../')
    return (selLabel, 'SVM_' + selLabel + '.model')

All other input arguments are small in size. And the machine has at least 100 cpus. In every run, it takes the script a long time even before any directory was created, although there is no significant computation happened in foo before os.mkdir()

like image 552
Bin Zhou Avatar asked Sep 13 '14 17:09

Bin Zhou


1 Answers

As the comments indicate you want to pass featureVector using the initializer and initargs arguments to Pool. On Unix-type systems this will result in a massive performance increase (even if there's only 1 item in selLabels) because the value will passed to the child process essentially for free using os.fork. Otherwise each time foo is called, featureVector will get pickled by the parent process, passed through a pipe and unpickled by the child process. This will take a long time, and will essentially serialize all the child processes since they'll be waiting for the parent process to pickle and send a copy of featureVector for each call, one by one.

Since there's some confusion about what I'm talking about above, here's a bit longer explanation of what's happening in your code as its currently written:

When you create the Pool object, immediately 30 worker processes are created, all children of the main process which created the Pool object. In order to communitcate with each child process a pipe is a created. This pipe allows two way communication between the parent process and the child processes. The parent uses the pipe to instruct the child process what to do, and the children uses the pipe to notify the parent of the result of any operations.

When you first call pool.apply_async the parent process sends a command through the pipe instructing a child process to execute the foo function using the arguments supplied. Since one of the arguments is huge, 300MB, this ends up taking a very long time. The parent process has to pickle the object. This converts the object (and everything it references) into a byte stream that can be sent through a pipe.

Since the pipe can only hold about 64k (Linux default), and you're sending much more than that, this effectively synchronizes the parent and one of the child processes. The parent process can only send the arguments as fast the child process can receive and unpickle them, and child process can only receive the arguments as fast as the parent pickle and send them. While this is going on all the other child process have to wait. The parent process can only send a command to one child process a time.

Once the parent process has finished sending all the arguments for the first call of foo, it can then move on to sending a command to call foo for a second time. Very soon after that, once the child process has finished receiving all the arguments, the child will call foo. (This is why it takes a long time before any directory is created, it takes a long time before foo is even called.) After foo returns the child process will then wait for the parent process to send another command. If foo itself takes a short enough amount of time to execute, it's possible that same child process that received the first command to call foo will also receive the second command to call foo.

Unless foo itself takes a long time to execute, as long or longer than it takes to send featureVector over the pipe, then you'll be effectively limited to just one child process executing at time. The parent process will be trying to command the child processes to call foo as fast as it can, but because featureVector is so big, it can only do so at a very slow rate. Once it's done sending the command to one process to call foo, the previous process it commanded to call foo will already finished calling foo long ago. There will be little or no overlap between running child processes.

In order to fixed the performance problem in your code you'll want to do something like this:

def child_initialize(_trainData, _featureVector, _terms):
     global trainData, featureVector, terms
     trainData = _trainData
     featureVector = _featureVector
     terms = _terms

def foo(selLabel):
     ...

pool = Pool(30, initialize = child_initialize, initargs = (trainData, featureVector, terms))
results = [pool.apply_async(foo, (selLabel,)) for selLabel in selLabels]

This code also passes trainData and term using initargs on the assumption they don't change either.

While this should result in a huge performance improvement, and allow the child processes to run in parallel, it's unlikely to mean that that child processes will show up in ps in the runable state all that more often. Your example foo function looks like it'll be spending most of its time waiting for the "command line call" to finish.

like image 181
Ross Ridge Avatar answered Nov 15 '22 18:11

Ross Ridge