Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sharing a result queue among several processes

The documentation for the multiprocessing module shows how to pass a queue to a process started with multiprocessing.Process. But how can I share a queue with asynchronous worker processes started with apply_async? I don't need dynamic joining or anything else, just a way for the workers to (repeatedly) report their results back to base.

import multiprocessing def worker(name, que):     que.put("%d is done" % name)  if __name__ == '__main__':     pool = multiprocessing.Pool(processes=3)     q = multiprocessing.Queue()     workers = pool.apply_async(worker, (33, q)) 

This fails with: RuntimeError: Queue objects should only be shared between processes through inheritance. I understand what this means, and I understand the advice to inherit rather than require pickling/unpickling (and all the special Windows restrictions). But how do I pass the queue in a way that works? I can't find an example, and I've tried several alternatives that failed in various ways. Help please?

like image 394
alexis Avatar asked Mar 28 '12 13:03

alexis


People also ask

What is multiprocessing Queue?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.

How does multiprocessing event work?

A multiprocessing. Event object wraps a boolean variable that can either be “set” (True) or “not set” (False). Processes sharing the event instance can check if the event is set, set the event, clear the event (make it not set), or wait for the event to be set.

How does multiprocessing pool work?

It works like a map-reduce architecture. It maps the input to the different processors and collects the output from all the processors. After the execution of code, it returns the output in form of a list or array. It waits for all the tasks to finish and then returns the output.


1 Answers

Try using multiprocessing.Manager to manage your queue and to also make it accessible to different workers.

import multiprocessing def worker(name, que):     que.put("%d is done" % name)  if __name__ == '__main__':     pool = multiprocessing.Pool(processes=3)     m = multiprocessing.Manager()     q = m.Queue()     workers = pool.apply_async(worker, (33, q)) 
like image 162
enderskill Avatar answered Sep 23 '22 11:09

enderskill