Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

using list with python multiprocessing

can anyone help me out with sharing a list between multiple python processes. The problem is to get self.ID_List and self.mps_in_process working in the following code.

import time, random
from multiprocessing import Process #, Manager, Array, Queue

class MP_Stuff():
    def __init__(self, parent, id):
        time.sleep(1 + random.random()*10) # simulate data processing
        parent.killMP(id)

class ParamHandler():
    def doFirstMP(self, IDs):
        self.mps_in_process = []
        self.ID_List = IDs
        id = self.ID_List.pop(0)
        p = Process(target=MP_Stuff, args=(self, id))
        self.mps_in_process.append(id)
        p.start()

    def doMP(self):
        for tmp in range(3): # nr of concurrent processes
            if len(self.ID_List) > 0:
                id = self.ID_List.pop(0)
                p = Process(target=MP_Stuff, args=(self, id))
                self.mps_in_process.append(id)
                p.start()

    def killMP(self, kill_id):
        self.mps_in_process.remove(kill_id)
        self.doMP()

if __name__ == '__main__':
    ID_List = [1,2,3,4,5,6]
    paramSet = ParamHandler()
    paramSet.doFirstMP(ID_List)

Very shortly, what the code does, is that some data (here, random time in MP_Stuff) is processed according to data id in self.ID_List. In order to know how much data id's are in process self.mps_in_process is used (nr processes is hardcoded here, but actually it's dynamic).

The problem is to share mps_in_process and ID_List across multiple processes. Current code goes into pretty much endless loop. What goes wrong is actually well described in multiprocessing library:

"if code run in a child process tries to access a global variable, then the value it sees (if any) may not be the same as the value in the parent process at the time that Process.start() was called."

However, I'm not able to figure out how to get mps_in_process and ID_List working. I cannot use Queue, as the way elements are taken out from mps_in_process is random. I cannot use Array, because .pop(0) does not work. I cannot use Manager().list(), because .remove() and len(ID_List) do not work then. Using threading instead of multiprocessing is no solution, because later freeze_support() must be used.

Therefore, any help how to share list among processes is very welcome!

like image 973
bitman Avatar asked Dec 01 '10 14:12

bitman


1 Answers

The Manager is working fine (including len()). The issue with your code is that in your main process, you don't wait until the processing ends, so the main process ends and the manager is no longer accessible. Also I don't know about atomicity of ListProxy's pop, so maybe a lock would be handy.

The solution is p.join().

However, I am confused why it is enough to do p.join at the end of doFirstMP. I would be happy if somebody could explain why join on the first p returns after all computation is done and not after the first doMP returns.

My code:

import time, random
from multiprocessing import Process, Manager

class MP_Stuff():
    def __init__(self, parent, id):
        time.sleep(1 + random.random()*5) # simulate data processing
        print id , "done"
        parent.killMP(id)

class ParamHandler():    
    def doFirstMP(self, IDs):
        self.mps_in_process = []
        self.ID_List = Manager().list(IDs)
        id = self.ID_List.pop(0)
        p = Process(target=MP_Stuff, args=(self, id))
        self.mps_in_process.append(id)
        p.start()
        p.join()
        print "joined"

    def doMP(self):
        for tmp in range(3): # nr of concurrent processes
            print self.ID_List
            if len(self.ID_List) > 0:
                id = self.ID_List.pop(0)
                p = Process(target=MP_Stuff, args=(self, id))
                self.mps_in_process.append(id)
                p.start()

    def killMP(self, kill_id):
        print "kill", kill_id
        self.mps_in_process.remove(kill_id)
        self.doMP()

if __name__ == '__main__':
    ID_List = [1,2,3,4,5,6]
    paramSet = ParamHandler()
    paramSet.doFirstMP(ID_List)
like image 125
Krab Avatar answered Oct 20 '22 08:10

Krab