This is not very important, just a silly experiment. I would like to create my own message passing. I would like to have a dictionary of queues, where each key is the PID of the process. Because I'd like to have the processes (created by Process()) to exchange messages inserting them in the queue of the process they want to send it to (knowing its pid). This is a silly code:
from multiprocessing import Process, Manager, Queue
from os import getpid
from time import sleep
def begin(dic, manager, parentQ):
parentQ.put(getpid())
dic[getpid()] = manager.Queue()
dic[getpid()].put("Something...")
if __name__== '__main__':
manager = Manager()
dic = manager.dict()
parentQ = Queue()
p = Process(target = begin, args=(dic, manager, parentQ))
p.start()
son = parentQ.get()
print son
sleep(2)
print dic[son].get()
dic[getpid()] = manager.Queue()
, this works fine. But when I perform
dic[son].put()/get()
I get this message:
Process Process-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "mps.py", line 8, in begin
dic[getpid()].put("Something...")
File "<string>", line 2, in __getitem__
File "/usr/lib/python2.7/multiprocessing/managers.py", line 773, in _callmethod
raise convert_to_error(kind, result)
RemoteError:
---------------------------------------------------------------------------
Unserializable message: ('#RETURN', <Queue.Queue instance at 0x8a92d0c>)
---------------------------------------------------------------------------
do you know what's the right way to do it?
I believe your code is failing because Queues are not serializable, just like the traceback says. The multiprocessing.Manager() object can create a shared dict for you without a problem, just as you've done here, but values stored in the dict still need to be serializable (or picklable in Pythonese). If you're okay with the subprocesses not having access to each other's queues, then this should work for you:
from multiprocessing import Process, Manager, Queue
from os import getpid
number_of_subprocesses_i_want = 5
def begin(myQ):
myQ.put("Something sentimental from your friend, PID {0}".format(getpid()))
return
if __name__== '__main__':
queue_dic = {}
queue_manager = Manager()
process_list = []
for i in xrange(number_of_subprocesses_i_want):
child_queue = queue_manager.Queue()
p = Process(target = begin, args=(child_queue,))
p.start()
queue_dic[p.pid] = child_queue
process_list.append(p)
for p in process_list:
print(queue_dic[p.pid].get())
p.join()
This leaves you with a dictionary whose keys are the child processes, and the values are their respective queues, which can be used from the main process.
I don't think your original goal is achievable with queues because queues that you want a subprocess to use must be passed to the processes when they are created, so as you launch more processes, you have no way to give an existing process access to a new queue.
One possible way to have inter-process communication would be to have everyone share a single queue to pass messages back to your main process bundled with some kind of header, such as in a tuple:
(destination_pid, sender_pid, message)
..and have main read the destination_pid and direct (sender_pid, message) to that subprocess' queue. Of course, this implies that you need a method of notifying existing processes when a new process is available to communicate with.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With