I am trying to spawn multiple parallel processes using the Python multiprocessing module. Basically, I did something like
pool = Pool(30)
results = [pool.apply_async(foo, (trainData, featureVector, terms, selLabel)) for selLabel in selLabels]
for r in results:
tmp = r.get()
modelFiles[tmp[0]] = tmp[1]
30 processes are spawned, however, it seems most of the processes have been put into sleep while only one process is actually running. Below is what I get from ps:
PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
31267 74.6 2.4 7125412 6360080 pts/1 Sl+ 13:06 24:25 \_ python2.6 /home/PerlModules/Python/DoOVA.py
31427 27.4 2.3 6528532 6120904 pts/1 R+ 13:20 5:18 \_ python2.6 /home/PerlModules/Python/DoOVA.py
31428 0.0 1.3 4024724 3617016 pts/1 S+ 13:20 0:00 \_ python2.6 /home/PerlModules/Python/DoOVA.py
31429 0.0 1.3 4024724 3617016 pts/1 S+ 13:20 0:00 \_ python2.6 /home/PerlModules/Python/DoOVA.py
31430 0.0 1.3 4024724 3617016 pts/1 S+ 13:20 0:00 \_ python2.6 /home/PerlModules/Python/DoOVA.py
DoOVA.py
is the script I am running. Most of them have a status S+
.
Could anyone give me some clue about what's the problem? I know the input arguement featureVector
is pretty large in size, say around 300MB. Would that be a problem? The machine I am running on have several TB of memory though.
foo does something like:
def foo(trainData, featureVector, terms, selLabel, penalty):
outputFile = 'train_'+selLabel+'.dat'
annotation = dict()
for id in trainData:
if trainData[id] == selLabel:
annotation[id] = '1'
else:
annotation[id] = '-1'
try:
os.mkdir(selLabel)
os.chdir(selLabel)
except OSError:
os.chdir(selLabel)
###Some more functions, which involves a command line call through call from subprocess module
os.chdir('../')
return (selLabel, 'SVM_' + selLabel + '.model')
All other input arguments are small in size. And the machine has at least 100 cpus. In every run, it takes the script a long time even before any directory was created, although there is no significant computation happened in foo before os.mkdir()
As the comments indicate you want to pass featureVector
using the initializer
and initargs
arguments to Pool
. On Unix-type systems this will result in a massive performance increase (even if there's only 1 item in selLabels
) because the value will passed to the child process essentially for free using os.fork
. Otherwise each time foo
is called, featureVector
will get pickled by the parent process, passed through a pipe and unpickled by the child process. This will take a long time, and will essentially serialize all the child processes since they'll be waiting for the parent process to pickle and send a copy of featureVector
for each call, one by one.
Since there's some confusion about what I'm talking about above, here's a bit longer explanation of what's happening in your code as its currently written:
When you create the Pool
object, immediately 30 worker processes are created, all children of the main process which created the Pool
object. In order to communitcate with each child process a pipe is a created. This pipe allows two way communication between the parent process and the child processes. The parent uses the pipe to instruct the child process what to do, and the children uses the pipe to notify the parent of the result of any operations.
When you first call pool.apply_async
the parent process sends a command through the pipe instructing a child process to execute the foo
function using the arguments supplied. Since one of the arguments is huge, 300MB, this ends up taking a very long time. The parent process has to pickle the object. This converts the object (and everything it references) into a byte stream that can be sent through a pipe.
Since the pipe can only hold about 64k (Linux default), and you're sending much more than that, this effectively synchronizes the parent and one of the child processes. The parent process can only send the arguments as fast the child process can receive and unpickle them, and child process can only receive the arguments as fast as the parent pickle and send them. While this is going on all the other child process have to wait. The parent process can only send a command to one child process a time.
Once the parent process has finished sending all the arguments for the first call of foo
, it can then move on to sending a command to call foo
for a second time. Very soon after that, once the child process has finished receiving all the arguments, the child will call foo
. (This is why it takes a long time before any directory is created, it takes a long time before foo
is even called.) After foo
returns the child process will then wait for the parent process to send another command. If foo
itself takes a short enough amount of time to execute, it's possible that same child process that received the first command to call foo
will also receive the second command to call foo
.
Unless foo
itself takes a long time to execute, as long or longer than it takes to send featureVector
over the pipe, then you'll be effectively limited to just one child process executing at time. The parent process will be trying to command the child processes to call foo
as fast as it can, but because featureVector
is so big, it can only do so at a very slow rate. Once it's done sending the command to one process to call foo
, the previous process it commanded to call foo
will already finished calling foo
long ago. There will be little or no overlap between running child processes.
In order to fixed the performance problem in your code you'll want to do something like this:
def child_initialize(_trainData, _featureVector, _terms):
global trainData, featureVector, terms
trainData = _trainData
featureVector = _featureVector
terms = _terms
def foo(selLabel):
...
pool = Pool(30, initialize = child_initialize, initargs = (trainData, featureVector, terms))
results = [pool.apply_async(foo, (selLabel,)) for selLabel in selLabels]
This code also passes trainData
and term
using initargs
on the assumption they don't change either.
While this should result in a huge performance improvement, and allow the child processes to run in parallel, it's unlikely to mean that that child processes will show up in ps
in the runable state all that more often. Your example foo
function looks like it'll be spending most of its time waiting for the "command line call" to finish.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With