Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you pass a Queue reference to a function managed by pool.map_async()?

I want a long-running process to return its progress over a Queue (or something similar) which I will feed to a progress bar dialog. I also need the result when the process is completed. A test example here fails with a RuntimeError: Queue objects should only be shared between processes through inheritance.

import multiprocessing, time  def task(args):     count = args[0]     queue = args[1]     for i in xrange(count):         queue.put("%d mississippi" % i)     return "Done"  def main():     q = multiprocessing.Queue()     pool = multiprocessing.Pool()     result = pool.map_async(task, [(x, q) for x in range(10)])     time.sleep(1)     while not q.empty():         print q.get()     print result.get()  if __name__ == "__main__":     main() 

I've been able to get this to work using individual Process objects (where I am alowed to pass a Queue reference) but then I don't have a pool to manage the many processes I want to launch. Any advise on a better pattern for this?

like image 806
David Avatar asked Jul 09 '10 22:07

David


People also ask

What is pool in multiprocessing python?

Python multiprocessing Pool can be used for parallel execution of a function across multiple input values, distributing the input data across processes (data parallelism). Below is a simple Python multiprocessing Pool example.

How does Python multiprocessing queue work?

The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added. The first items added to the queue will be the first items retrieved.

How does multiprocessing pool work?

Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.


2 Answers

The following code seems to work:

import multiprocessing, time  def task(args):     count = args[0]     queue = args[1]     for i in xrange(count):         queue.put("%d mississippi" % i)     return "Done"   def main():     manager = multiprocessing.Manager()     q = manager.Queue()     pool = multiprocessing.Pool()     result = pool.map_async(task, [(x, q) for x in range(10)])     time.sleep(1)     while not q.empty():         print q.get()     print result.get()  if __name__ == "__main__":     main() 

Note that the Queue is got from a manager.Queue() rather than multiprocessing.Queue(). Thanks Alex for pointing me in this direction.

like image 169
David Avatar answered Sep 28 '22 00:09

David


Making q global works...:

import multiprocessing, time  q = multiprocessing.Queue()  def task(count):     for i in xrange(count):         q.put("%d mississippi" % i)     return "Done"  def main():     pool = multiprocessing.Pool()     result = pool.map_async(task, range(10))     time.sleep(1)     while not q.empty():         print q.get()     print result.get()  if __name__ == "__main__":     main() 

If you need multiple queues, e.g. to avoid mixing up the progress of the various pool processes, a global list of queues should work (of course, each process will then need to know what index in the list to use, but that's OK to pass as an argument;-).

like image 31
Alex Martelli Avatar answered Sep 27 '22 23:09

Alex Martelli