Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best way to implement this workflow in python

I would like to implement the following workflow in Python. On the example i have 5 processes running in parallel.One process is defined as manager while the others are all workers. The manager performs the control routine multiple times in a round-robin fashion until all workers stop.

The main idea on the problem is that each worker starts with a distinguish list of work. However while they are processing their work, they will add more work to their own list, and in some cases repetition of worker will occur amoung workers. The manager function is to avoid that work repetition.

Workflow overview

Im currently trying to do this with Python multiprocessing and pipes. Here is my code just to perform tests on the communication:

import multiprocessing as mp
import time
import random


def checkManager(i, data_pipe, com_pipe, work_list):
    com_out, com_in  = com_pipe
    if com_out.poll():  ##check if there is anything to read
        msg = com_out.recv()
        print i, " received message ", msg
        if msg == "SEND":
            data_out, data_in = data_pipe
            data_in.send(work_list)
            work_list = com_out.recv()
    return work_list

def myfunc(i, data_pipe, com_pipe, work_list):
    print "starting worker ", i, " with list: ", work_list
    while work_list != []:
        time.sleep(3) ##sleep just to simulate some work delay
        work_list = checkManager(i, data_pipe, com_pipe, work_list) ##check if manager wants to comunicate
    print "stopping worker ", i


print "Starting..."

data_pipe = mp.Pipe() ##pipe to receive work lists from all workers
pipes = []  ##comunication pipe for each worker
workers = []

##spawn workers
for i in range(0,4):
    pipe = mp.Pipe() ##pipe for comunication for that process
    r = random.randint(10, 100) ##create random list just to test
    p = mp.Process(target=myfunc,args=(i, data_pipe, pipe, range(r)))  ##create process
    pipes.append(pipe)
    workers.append(p)
    p.start()

index = 0
stopped_workers = []
data_out, data_in = data_pipe
while len(stopped_workers) != len(workers):
    time.sleep(2)
    for i in range(0, len(workers)):
        if i in stopped_workers: ##check if wworker has already stopepd
            continue
        r = random.randint(0,100)  ##just to avoid send the request all the times..
        if r > 80:
            print "Comunication with ", i
            output, input = pipes[i]
            input.send("SEND") ## send message
            work_list = data_out.recv() #block untill receive data
            print "data: ", work_list
            input.send([]) ##send an empty list just to test if it stops
            stopped_workers.append(i) ##add to the workers that already stopped

print "Stoping main"

Everything is working fine on this simple test, however i would like this to be as efficient as possible, and there are some things i dont like on my code.

First of all, i think it would be more efficient if i had a mechanism that would send a signal to the worker processes instead of having them to check a function from time to time. I tried to use signals but never managed it to work properly. In addition to that, i create as many pipes as the number of processes, im not sure if that is the best solution. I have checked some examples using the multiprocessing.Pool, however that didn't look like a good solution for my problem.

Just to finish, would it be better to implement everything using the python MPI library?

Thanks in advance

like image 682
Jorge Silva Avatar asked May 14 '26 06:05

Jorge Silva


1 Answers

You might be more happy with a convenience framework like Circuits. It strongly emphasizes (among others) the concepts of components and event signaling and also does multiprocessing rather well.

like image 136
riot Avatar answered May 16 '26 20:05

riot



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!