Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using multiprocessing pool of workers

I have the following code written to make my lazy second CPU core working. What the code does basically is first find the desired "sea" files in the directory hierarchy and later execute set of external scripts to process these binary "sea" files to produce 50 to 100 text and binary files in number. As the title of the question suggest in a paralleled fashion to increase the processing speed.

This question originates from the long discussion that we have been having on IPython users list titled as "Cannot start ipcluster". Starting with my experimentation on IPython's parallel processing functionalities.

The issue is I can't get this code running correctly. If the folders that contain "sea" files only houses "sea" files the script finishes its execution without fully performing external script runs. (Say I have 30-50 external scripts to run, but my multiprocessing enabled script exhaust only after executing the first script in these external script chain.) Interestingly, if I run this script on an already processed folder (which is "sea" files processed beforehand and output files are already in that folder) then it runs, but this time I get speed-ups at about 2.4 to 2.7X with respect to linear processing timings. It is not very expected since I only have a Core 2 Duo 2.5 Ghz CPU in my laptop. Although I have a CUDA powered GPU it has nothing to do with my current parallel computing struggle :)

What do you think might be source of this issue?

Thank you for all comments and suggestions.

#!/usr/bin/env python

from multiprocessing import Pool
from subprocess import call
import os


def find_sea_files():

   file_list, path_list = [], []
   init = os.getcwd()

   for root, dirs, files in os.walk('.'):
      dirs.sort()
      for file in files:
          if file.endswith('.sea'):
              file_list.append(file)
              os.chdir(root)
              path_list.append(os.getcwd())
              os.chdir(init)

   return file_list, path_list


def process_all(pf):
   os.chdir(pf[0])
   call(['postprocessing_saudi', pf[1]])


if __name__ == '__main__':
   pool = Pool(processes=2)              # start 2 worker processes
   files, paths = find_sea_files()
   pathfile = [[paths[i],files[i]] for i in range(len(files))]
   pool.map(process_all, pathfile)
like image 997
Gökhan Sever Avatar asked Oct 19 '09 02:10

Gökhan Sever


2 Answers

I would start with getting a better feeling for what is going on with the worker process. The multiprocessing module comes with logging for its subprocesses if you need. Since you have simplified the code to narrow down the problem, I would just debug with a few print statements, like so (or you can PrettyPrint the pf array):


def process_all(pf):
   print "PID: ", os.getpid()
   print "Script Dir: ", pf[0]
   print "Script: ", pf[1]
   os.chdir(pf[0])
   call(['postprocessing_saudi', pf[1]])


if __name__ == '__main__':
   pool = Pool(processes=2)
   files, paths = find_sea_files()
   pathfile = [[paths[i],files[i]] for i in range(len(files))]
   pool.map(process_all, pathfile, 1) # Ensure the chunk size is 1
   pool.close()
   pool.join()

The version of Python that I have accomplished this with 2.6.4.

like image 174
Eric Lubow Avatar answered Oct 10 '22 22:10

Eric Lubow


There are several things I can think of:

1) Have you printed out the pathfiles? Are you sure that they are all properly generated?

a) I ask as your os.walk is a bit interesting; the dirs.sort() should be ok, but seems quite unncessarily. os.chdir() in general shouldn't be used; the restoration should be alright, but in general you should just be appending root to init.

2) I've seen multiprocessing on python2.6 have problems spawning subporcesses from pools. (I specifically had a script use multiprocessing to spawn subprocesses. Those subprocesses then could not correctly use multiprocessing (the pool locked up)). Try python2.5 w/ the mulitprocessing backport.

3) Try picloud's cloud.mp module (which wraps multiprocessing, but handles pools a tad differently) and see if that works.

You would do

cloud.mp.join(cloud.mp.map(process_all, pathfile))

(Disclaimer: I am one of the developers of PiCloud)

like image 28
UsAaR33 Avatar answered Oct 10 '22 22:10

UsAaR33