Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Share an evolving dict between processes

Question presentation

I'm front of multiprocessing problematic. A large part of multiprocessing stack overflow questions are less complex than my situation and don't answer to it. Some people vote about possible duplicate with this question, but mine is different, in my situation, the shared DICT is modified between processes job:

I have a program who follow this simplified life cycle:

A. Initialize DATA dict
B. Initialize 4 subprocess workers
C. Execute code in each workers (worker massively read DATA dict)
D. Wait workers job is done
E. Modify DATA dict content
F. Go to C

Performance is an very important side of problem. I experimented many solutions with positive and negatives points:

Simple global dict (not working)

At step B the DICT variable is forked into sub processes environment. But after step E sub processes can't see changes.

Use multiprocessing.Manager dict

At step A dict is created with multiprocessing.Manager (see "Server process" here).

  • Pros: Easy to use
  • Cons: multiprocessing.Manager use serialization layer (i don't know much it, but it is able to work with processes on network), it is bad for performance.

Use multiple multiprocessing.Value and multiprocessing.Array instead a dict

multiprocessing.Value and multiprocessing.Array permit to use shared memory. I tried to replace my dict with several multiprocessing.Value and multiprocessing.Array like this:

With dict:

manager = multiprocessing.Manager()
dict = manager.dict()
dict['positions'] = [42, 165]
dict['on_position_42'] = 1555897
dict['on_position_165'] = 1548792

Replaced dict with multiprocessing.Value and multiprocessing.Array:

positions = multiprocessing.Array('i', [42, 165])
on_position_42 = multiprocessing.Value('i', 1555897)
on_position_165 = multiprocessing.Value('i', 1548792)

But at step E i will need to create new multiprocessing.Value and multiprocessing.Array, example:

positions.value = [42, 165, 322]
# create new multiprocessing.Value for 322
on_position_322 = multiprocessing.Value('i', 2258777)

Then at step C, on_position_322 will be unknown for workers. If i try to send multiprocessing.Value or multiprocessing.Array to sub processes through pipes it will result "Synchronized objects should only be shared between processes through inheritance" error.

  • Pros: Performance
  • Cons: How to "inform" sub process about existence of new multiprocessing.Value and multiprocessing.Array ?

Use memory database like memcache or redis

I know it's a possibility but i have to benchmark memory database versus multiprocessing.Manager dict.

  • Pros: Pragmatic and working
  • Cons: Performances ?

 Question conclusion

Does exist a way to use multiprocessing.Value and multiprocessing.Array in this life cycle, considering creation of new multiprocessing.Value and multiprocessing.Array ?

Or more generally, what will be the most perform strategy considering this life cycle ?

Note: I previously try an other strategy where step F is a "Go to B" (re-create new workers at each cycle). But forking environment of workers was too much long: Biggest was DICT longest was fork.

like image 860
bux Avatar asked Aug 30 '17 11:08

bux


1 Answers

Since you're only reading from the dictionary and updating it in the main process you can use a JoinableQueue to pass the dictionary and wait for the workers to finish. E.g.

from multiprocessing import Process, JoinableQueue
import time

class Worker(Process):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue

    def run(self):
        for item in iter(self.queue.get, None):
            print item
            time.sleep(2)
            print 'done'
            self.queue.task_done()
        self.queue.task_done()

if __name__ == '__main__':
    request_queue = JoinableQueue()
    num_workers = 4
    workers = []
    d = {}  # A

    for _ in range(num_workers): 
        p = Worker(request_queue) # B
        workers.append(p)
        p.start()


    for i in range(5): # F
        for _ in range(num_workers):
            request_queue.put(d) # C
        request_queue.join()  # D
        d[i] = i  # E

    for w in workers:
        w.terminate()
        w.join()

Output:

{}
{}
{}
{}
done
done
done
done
{0: 0}
{0: 0}
{0: 0}
{0: 0}
done
done
done
done
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
done
done
done
done
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
done
done
done
done
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
done
done
done
done
like image 81
slam08 Avatar answered Oct 12 '22 14:10

slam08