Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python dictionary of queues inter processes

Tags:

python

This is not very important, just a silly experiment. I would like to create my own message passing. I would like to have a dictionary of queues, where each key is the PID of the process. Because I'd like to have the processes (created by Process()) to exchange messages inserting them in the queue of the process they want to send it to (knowing its pid). This is a silly code:

from multiprocessing import Process, Manager, Queue
from os import getpid
from time import sleep

def begin(dic, manager, parentQ):
    parentQ.put(getpid())
    dic[getpid()] = manager.Queue()
    dic[getpid()].put("Something...")

if __name__== '__main__':
    manager = Manager()
    dic = manager.dict()
    parentQ = Queue()

    p = Process(target = begin, args=(dic, manager, parentQ))
    p.start()
    son = parentQ.get()
    print son
    sleep(2)
    print dic[son].get()

dic[getpid()] = manager.Queue(), this works fine. But when I perform dic[son].put()/get() I get this message:

Process Process-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "mps.py", line 8, in begin
    dic[getpid()].put("Something...")
  File "<string>", line 2, in __getitem__
  File "/usr/lib/python2.7/multiprocessing/managers.py", line 773, in _callmethod
    raise convert_to_error(kind, result)
RemoteError: 
---------------------------------------------------------------------------
Unserializable message: ('#RETURN', <Queue.Queue instance at 0x8a92d0c>)
---------------------------------------------------------------------------

do you know what's the right way to do it?

like image 877
user2204592 Avatar asked Nov 13 '22 08:11

user2204592


1 Answers

I believe your code is failing because Queues are not serializable, just like the traceback says. The multiprocessing.Manager() object can create a shared dict for you without a problem, just as you've done here, but values stored in the dict still need to be serializable (or picklable in Pythonese). If you're okay with the subprocesses not having access to each other's queues, then this should work for you:

from multiprocessing import Process, Manager, Queue
from os import getpid

number_of_subprocesses_i_want = 5

def begin(myQ):
    myQ.put("Something sentimental from your friend, PID {0}".format(getpid()))
    return

if __name__== '__main__':
    queue_dic = {}
    queue_manager = Manager()

    process_list = []

    for i in xrange(number_of_subprocesses_i_want):
        child_queue = queue_manager.Queue()

        p = Process(target = begin, args=(child_queue,))
        p.start()
        queue_dic[p.pid] = child_queue
        process_list.append(p)

    for p in process_list:
        print(queue_dic[p.pid].get())
        p.join()

This leaves you with a dictionary whose keys are the child processes, and the values are their respective queues, which can be used from the main process.

I don't think your original goal is achievable with queues because queues that you want a subprocess to use must be passed to the processes when they are created, so as you launch more processes, you have no way to give an existing process access to a new queue.

One possible way to have inter-process communication would be to have everyone share a single queue to pass messages back to your main process bundled with some kind of header, such as in a tuple:

(destination_pid, sender_pid, message)

..and have main read the destination_pid and direct (sender_pid, message) to that subprocess' queue. Of course, this implies that you need a method of notifying existing processes when a new process is available to communicate with.

like image 53
skrrgwasme Avatar answered Nov 14 '22 21:11

skrrgwasme