I am aware of multiprocessing.Manager()
and how it can be used to create shared objects, in particular queues which can be shared between workers. There is this question, this question, this question and even one of my own questions.
However, I need to define a great many queues, each of which is linking a specific pair of processes. Say that each pair of processes and its linking queue is identified by the variable key
.
I want to use a dictionary to access my queues when I need to put and get data. I cannot make this work. I've tried a number of things. With multiprocessing
imported as mp
:
Defining a dict like for key in all_keys: DICT[key] = mp.Queue
in a config file which is imported by the multiprocessing module (call it multi.py
) does not return errors, but the queue DICT[key]
is not shared between the processes, each one seems to have their own copy of the queue and thus no communication happens.
If I try to define the DICT
at the beginning of the main multiprocessing function that defines the processes and starts them, like
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
I get the error
RuntimeError: Queue objects should only be shared between processes through
inheritance
Changing to
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
only makes everything worse. Trying similar definitions at the head of multi.py
rather than inside the main function returns similar errors.
There must be a way to share many queues between processes without explicitly naming each one in the code. Any ideas?
Edit
Here is a basic schema of the program:
1- load the first module, which defines some variables, imports multi
, launches multi.main()
, and loads another module which starts a cascade of module loads and code execution. Meanwhile...
2- multi.main
looks like this:
def main():
manager = mp.Manager()
pool = mp.Pool()
DICT2 = manager.dict()
for key in all_keys:
DICT2[key] = manager.Queue()
proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
proc_2 = pool.apply_async(targ2,(DICT2[key], otherargs,)
Rather than use pool
and manager
, I was also launching processes with the following:
mp.Process(target=targ1, args=(DICT[key],))
3 - The function targ1
takes input data that is coming in (sorted by key
) from the main process. It is meant to pass the result to DICT[key]
so targ2
can do its work. This is the part that is not working. There are an arbitrary number of targ1
s, targ2
s, etc. and therefore an arbitrary number of queues.
4 - The results of some of these processes will be sent to a bunch of different arrays / pandas dataframes which are also indexed by key
, and which I would like to be accessible from arbitrary processes, even ones launched in a different module. I have yet to write this part and it might be a different question. (I mention it here because the answer to 3 above might also solve 4 nicely.)
Passing Messages to Processes A simple way to communicate between process with multiprocessing is to use a Queue to pass messages back and forth. Any pickle-able object can pass through a Queue. This short example only passes a single message to a single worker, then the main process waits for the worker to finish.
Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.
multiprocessing provides two methods of doing this: one using shared memory (suitable for simple values, arrays, or ctypes) or a Manager proxy, where one process holds the memory and a manager arbitrates access to it from other processes (even over a network).
It sounds like your issues started when you tried to share a multiprocessing.Queue()
by passing it as an argument. You can get around this by creating a managed queue instead:
import multiprocessing
manager = multiprocessing.Manager()
passable_queue = manager.Queue()
When you use a manager to create it, you are storing and passing around a proxy to the queue, rather than the queue itself, so even when the object you pass to your worker processes is a copied, it will still point at the same underlying data structure: your queue. It's very similar (in concept) to pointers in C/C++. If you create your queues this way, you will be able to pass them when you launch a worker process.
Since you can pass queues around now, you no longer need your dictionary to be managed. Keep a normal dictionary in main that will store all the mappings, and only give your worker processes the queues they need, so they won't need access to any mappings.
I've written an example of this here. It looks like you are passing objects between your workers, so that's what's done here. Imagine we have two stages of processing, and the data both starts and ends in the control of main
. Look at how we can create the queues that connect the workers like a pipeline, but by giving them only they queues they need, there's no need for them to know about any mappings:
import multiprocessing as mp
def stage1(q_in, q_out):
q_out.put(q_in.get()+"Stage 1 did some work.\n")
return
def stage2(q_in, q_out):
q_out.put(q_in.get()+"Stage 2 did some work.\n")
return
def main():
pool = mp.Pool()
manager = mp.Manager()
# create managed queues
q_main_to_s1 = manager.Queue()
q_s1_to_s2 = manager.Queue()
q_s2_to_main = manager.Queue()
# launch workers, passing them the queues they need
results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))
# Send a message into the pipeline
q_main_to_s1.put("Main started the job.\n")
# Wait for work to complete
print(q_s2_to_main.get()+"Main finished the job.")
pool.close()
pool.join()
return
if __name__ == "__main__":
main()
The code produces this output:
Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.
I didn't include an example of storing the queues or AsyncResults
objects in dictionaries, because I still don't quite understand how your program is supposed to work. But now that you can pass your queues freely, you can build your dictionary to store the queue/process mappings as needed.
In fact, if you really do build a pipeline between multiple workers, you don't even need to keep a reference to the "inter-worker" queues in main
. Create the queues, pass them to your workers, then only retain references to queues that main
will use. I would definitely recommend trying to let old queues be garbage collected as quickly as possible if you really do have "an arbitrary number" of queues.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With