Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python multiprocess update dictionary synchronously

I am trying to update one common dictionary through multiple processes. Could you please help me find out what is the problem with this code? I get the following output:

inside function
{1: 1, 2: -1}
comes here
inside function
{1: 0, 2: 2}
comes here
{1: 0, 2: -1}

Thanks.

from multiprocessing import Lock, Process, Manager

l= Lock()


def computeCopyNum(test,val):
    l.acquire()
    test[val]=val
    print "inside function"
    print test
    l.release()
    return

a=dict({1: 0, 2: -1})

procs=list()

for i in range(1,3):
    p = Process(target=computeCopyNum, args=(a,i))
    procs.append(p)
    p.start()

for p in procs:
p.join()
    print "comes here"

print a
like image 763
user1050325 Avatar asked Dec 09 '22 21:12

user1050325


1 Answers

The answer is actually quite simple. You're using the multiprocessing module, with which you start several different python processes. Different processes have different address spaces and they do not share memory, so all your processes write to their own local copy of the dictionary.

The easiest way to do inter-process communication when using the multiprocessing module is to use a queue to communicate between the slave processes and the master process.

from multiprocessing import Process, Queue

def computeCopyNum(queue, val):
    queue.put(val) # can also put a tuple of thread-id and value if we would like to

procs=list()

queue = Queue()
for i in range(1,3):
    p = Process(target=computeCopyNum, args=(queue, i))
    procs.append(p)
    p.start()

for _ in procs:
    val = queue.get()
    # do whatever with val

for p in procs:
    p.join()

If each slave-process can generate multiple output values it might be prudent to let each slave-process write a sentinel-value to the queue to signal to the master that it's done. Then the code might look something like:

def slave(queue):
    for i in range(128): # just for example
        val = #some calculated result
        queue.put(val)

    queue.put(None) # add a sentinel value to tell the master we're done

queue = Queue()

# spawn 32 slave processes
num_procs = 32
procs = [Process(target=slave, args=(queue, )) for _ in range(num_procs)]
for proc in procs: 
    proc.start()

finished = 0
while finished < num_procs:
    item = queue.get()
    if item is None: 
        finished += 1
    else: 
        # do something with item

for proc in procs: 
    proc.join()

You can also use a Manager, as shown in another answer. The problem with that approach is that a lot of implicit memory copying between process address spaces might occur, and that can be hard to reason about. I always prefer using explicit queues.

like image 55
dnaq Avatar answered Dec 11 '22 10:12

dnaq