I have a Python script that I want to use as a controller to another Python script. I have a server with 64 processors, so want to spawn up to 64 child processes of this second Python script. The child script is called:
$ python create_graphs.py --name=NAME
where NAME is something like XYZ, ABC, NYU etc.
In my parent controller script I retrieve the name variable from a list:
my_list = [ 'XYZ', 'ABC', 'NYU' ]
So my question is, what is the best way to spawn off these processes as children? I want to limit the number of children to 64 at a time, so need to track the status (if the child process has finished or not) so I can efficiently keep the whole generation running.
I looked into using the subprocess package, but rejected it because it only spawns one child at a time. I finally found the multiprocessor package, but I admit to being overwhelmed by the whole threads vs. subprocesses documentation.
Right now, my script uses subprocess.call
to only spawn one child at a time and looks like this:
#!/path/to/python import subprocess, multiprocessing, Queue from multiprocessing import Process my_list = [ 'XYZ', 'ABC', 'NYU' ] if __name__ == '__main__': processors = multiprocessing.cpu_count() for i in range(len(my_list)): if( i < processors ): cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]] child = subprocess.call( cmd, shell=False )
I really want it to spawn up 64 children at a time. In other stackoverflow questions I saw people using Queue, but it seems like that creates a performance hit?
Python provides the ability to create and manage new processes via the multiprocessing. Process class. In multiprocessing programming, we may need to change the technique used to start child processes. This is called the start method.
The Pool class in multiprocessing can handle an enormous number of processes. It allows you to run multiple jobs per process (due to its ability to queue the jobs). The memory is allocated only to the executing processes, unlike the Process class, which allocates memory to all the processes.
It works like a map-reduce architecture. It maps the input to the different processors and collects the output from all the processors. After the execution of code, it returns the output in form of a list or array. It waits for all the tasks to finish and then returns the output.
What you are looking for is the process pool class in multiprocessing.
import multiprocessing import subprocess def work(cmd): return subprocess.call(cmd, shell=False) if __name__ == '__main__': count = multiprocessing.cpu_count() pool = multiprocessing.Pool(processes=count) print pool.map(work, ['ls'] * count)
And here is a calculation example to make it easier to understand. The following will divide 10000 tasks on N processes where N is the cpu count. Note that I'm passing None as the number of processes. This will cause the Pool class to use cpu_count for the number of processes (reference)
import multiprocessing import subprocess def calculate(value): return value * 10 if __name__ == '__main__': pool = multiprocessing.Pool(None) tasks = range(10000) results = [] r = pool.map_async(calculate, tasks, callback=results.append) r.wait() # Wait on the results print results
Here is the solution I came up, based on Nadia and Jim's comments. I am not sure if it is the best way, but it works. The original child script being called needs to be a shell script because I need to use some 3rd party apps including Matlab. So I had to take it out of Python and code it in bash.
import sys import os import multiprocessing import subprocess def work(staname): print 'Processing station:',staname print 'Parent process:', os.getppid() print 'Process id:', os.getpid() cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (staname) ] return subprocess.call(cmd, shell=False) if __name__ == '__main__': my_list = [ 'XYZ', 'ABC', 'NYU' ] my_list.sort() print my_list # Get the number of processors available num_processes = multiprocessing.cpu_count() threads = [] len_stas = len(my_list) print "+++ Number of stations to process: %s" % (len_stas) # run until all the threads are done, and there is no data left for list_item in my_list: # if we aren't using all the processors AND there is still data left to # compute, then spawn another thread if( len(threads) < num_processes ): p = multiprocessing.Process(target=work,args=[list_item]) p.start() print p, p.is_alive() threads.append(p) else: for thread in threads: if not thread.is_alive(): threads.remove(thread)
Does this seem like a reasonable solution? I tried to use Jim's while loop format, but my script just returned nothing. I am not sure why that would be. Here is the output when I run the script with Jim's 'while' loop replacing the 'for' loop:
hostname{me}2% controller.py ['ABC', 'NYU', 'XYZ'] Number of processes: 64 +++ Number of stations to process: 3 hostname{me}3%
When I run it with the 'for' loop, I get something more meaningful:
hostname{me}6% controller.py ['ABC', 'NYU', 'XYZ'] Number of processes: 64 +++ Number of stations to process: 3 Processing station: ABC Parent process: 1056 Process id: 1068 Processing station: NYU Parent process: 1056 Process id: 1069 Processing station: XYZ Parent process: 1056 Process id: 1071 hostname{me}7%
So this works, and I am happy. However, I still don't get why I can't use Jim's 'while' style loop instead of the 'for' loop I am using. Thanks for all the help - I am impressed with the breadth of knowledge @ stackoverflow.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With