Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Multiprocessing Worker/Queue

I have a python function that has to run 12 times in total. I have this set up currently to use Pool from the multiprocessing library to run up to all of them in parallel. Typically I run 6 at a time because the function is CPU intensive and running 12 in parallel often causes the program to crash. When we do 6 at a time, the second set of 6 will not begin until all of the first 6 processes are finished. Ideally, we would like another one (e.g. the 7th) to kick off as soon as one from the initial batch of 6 is finished- So that 6 are running at once while there are more to start. Right now the code looks like this (it would be called twice, passing the first 6 elements in one list and then the second 6 in another:

from multiprocessing import Pool

def start_pool(project_list):

    pool = Pool(processes=6)
    pool.map(run_assignments_parallel,project_list[0:6])

So i have been trying to implement a worker/queue solution and have run into some issues. I have a worker function that looks like this:

def worker(work_queue, done_queue):
    try:
        for proj in iter(work_queue.get, 'STOP'):
            print proj
            run_assignments_parallel(proj)
            done_queue.put('finished ' + proj )
    except Exception, e:        
        done_queue.put("%s failed on %s with: %s" % (current_process().name, proj,        e.message))
    return True

And the code to call the worker function is as follows:

workers = 6
work_queue = Queue()
done_queue = Queue()  
processes = []
for project in project_list:
    print project
    work_queue.put(project)
for w in xrange(workers):        
    p = Process(target=worker, args=(work_queue, done_queue))
    p.start()
    processes.append(p)
    work_queue.put('STOP')
for p in processes:
     p.join()    
     done_queue.put('STOP')
for status in iter(done_queue.get, 'STOP'):        
    print status

project_list is just a list of paths for the 12 projects that need to be run in the function 'run_assignments_parallel.'

The way this is written now, the function is getting called more than once for the same process (project) and I cant really tell what is going on. This code is based on an example i found and I am pretty sure the looping structure is messed up. Any help would be great and I aplogize for my ignorance on the matter. Thanks!

like image 333
user2503169 Avatar asked Mar 06 '14 20:03

user2503169


1 Answers

Ideally, we would like another one (e.g. the 7th) to kick off as soon as one from the initial batch of 6 is finished- So that 6 are running at once while there are more to start.

All you need to change is to pass all 12 input parameters instead of 6:

from multiprocessing import Pool
pool = Pool(processes=6) # run no more than 6 at a time
pool.map(run_assignments_parallel, project_list) # pass full list (12 items)
like image 59
jfs Avatar answered Sep 30 '22 08:09

jfs