I would like to implement the following workflow in Python. On the example i have 5 processes running in parallel.One process is defined as manager while the others are all workers. The manager performs the control routine multiple times in a round-robin fashion until all workers stop.
The main idea on the problem is that each worker starts with a distinguish list of work. However while they are processing their work, they will add more work to their own list, and in some cases repetition of worker will occur amoung workers. The manager function is to avoid that work repetition.

Im currently trying to do this with Python multiprocessing and pipes. Here is my code just to perform tests on the communication:
import multiprocessing as mp
import time
import random
def checkManager(i, data_pipe, com_pipe, work_list):
com_out, com_in = com_pipe
if com_out.poll(): ##check if there is anything to read
msg = com_out.recv()
print i, " received message ", msg
if msg == "SEND":
data_out, data_in = data_pipe
data_in.send(work_list)
work_list = com_out.recv()
return work_list
def myfunc(i, data_pipe, com_pipe, work_list):
print "starting worker ", i, " with list: ", work_list
while work_list != []:
time.sleep(3) ##sleep just to simulate some work delay
work_list = checkManager(i, data_pipe, com_pipe, work_list) ##check if manager wants to comunicate
print "stopping worker ", i
print "Starting..."
data_pipe = mp.Pipe() ##pipe to receive work lists from all workers
pipes = [] ##comunication pipe for each worker
workers = []
##spawn workers
for i in range(0,4):
pipe = mp.Pipe() ##pipe for comunication for that process
r = random.randint(10, 100) ##create random list just to test
p = mp.Process(target=myfunc,args=(i, data_pipe, pipe, range(r))) ##create process
pipes.append(pipe)
workers.append(p)
p.start()
index = 0
stopped_workers = []
data_out, data_in = data_pipe
while len(stopped_workers) != len(workers):
time.sleep(2)
for i in range(0, len(workers)):
if i in stopped_workers: ##check if wworker has already stopepd
continue
r = random.randint(0,100) ##just to avoid send the request all the times..
if r > 80:
print "Comunication with ", i
output, input = pipes[i]
input.send("SEND") ## send message
work_list = data_out.recv() #block untill receive data
print "data: ", work_list
input.send([]) ##send an empty list just to test if it stops
stopped_workers.append(i) ##add to the workers that already stopped
print "Stoping main"
Everything is working fine on this simple test, however i would like this to be as efficient as possible, and there are some things i dont like on my code.
First of all, i think it would be more efficient if i had a mechanism that would send a signal to the worker processes instead of having them to check a function from time to time. I tried to use signals but never managed it to work properly. In addition to that, i create as many pipes as the number of processes, im not sure if that is the best solution. I have checked some examples using the multiprocessing.Pool, however that didn't look like a good solution for my problem.
Just to finish, would it be better to implement everything using the python MPI library?
Thanks in advance
You might be more happy with a convenience framework like Circuits. It strongly emphasizes (among others) the concepts of components and event signaling and also does multiprocessing rather well.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With