Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I use a multiprocessing Queue in a function called by Pool.imap?

I'm using python 2.7, and trying to run some CPU heavy tasks in their own processes. I would like to be able to send messages back to the parent process to keep it informed of the current status of the process. The multiprocessing Queue seems perfect for this but I can't figure out how to get it work.

So, this is my basic working example minus the use of a Queue.

import multiprocessing as mp import time  def f(x):     return x*x  def main():     pool = mp.Pool()     results = pool.imap_unordered(f, range(1, 6))     time.sleep(1)      print str(results.next())      pool.close()     pool.join()  if __name__ == '__main__':     main() 

I've tried passing the Queue in several ways, and they get the error message "RuntimeError: Queue objects should only be shared between processes through inheritance". Here is one of the ways I tried based on an earlier answer I found. (I get the same problem trying to use Pool.map_async and Pool.imap)

import multiprocessing as mp import time  def f(args):     x = args[0]     q = args[1]     q.put(str(x))     time.sleep(0.1)     return x*x  def main():     q = mp.Queue()     pool = mp.Pool()     results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))      print str(q.get())      pool.close()     pool.join()  if __name__ == '__main__':     main() 

Finally, the 0 fitness approach (make it global) doesn't generate any messages, it just locks up.

import multiprocessing as mp import time  q = mp.Queue()  def f(x):     q.put(str(x))     return x*x  def main():     pool = mp.Pool()     results = pool.imap_unordered(f, range(1, 6))     time.sleep(1)      print q.get()      pool.close()     pool.join()  if __name__ == '__main__':     main() 

I'm aware that it will probably work with multiprocessing.Process directly and that there are other libraries to accomplish this, but I hate to back away from the standard library functions that are a great fit until I'm sure it's not just my lack of knowledge keeping me from being able to exploit them.

Thanks.

like image 712
Olson Avatar asked Sep 30 '10 01:09

Olson


People also ask

When would you use a multiprocessing pool?

Python multiprocessing Pool can be used for parallel execution of a function across multiple input values, distributing the input data across processes (data parallelism).

How do processes pools work in multiprocessing?

Pool allows multiple jobs per process, which may make it easier to parallel your program. If you have a numbers jobs to run in parallel, you can make a Pool with number of processes the same number of as CPU cores and after that pass the list of the numbers jobs to pool. map.

What is the difference between pool and process in multiprocessing?

While the Process keeps all the processes in the memory, the Pool keeps only those that are under execution. Therefore, if you have a large number of tasks, and if they have more data and take a lot of space too, then using process class might waste a lot of memory. The overhead of creating a Pool is more.

Is multiprocessing queue process safe?

Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.


1 Answers

The trick is to pass the Queue as an argument to the initializer. Appears to work with all the Pool dispatch methods.

import multiprocessing as mp  def f(x):     f.q.put('Doing: ' + str(x))     return x*x  def f_init(q):     f.q = q  def main():     jobs = range(1,6)      q = mp.Queue()     p = mp.Pool(None, f_init, [q])     results = p.imap(f, jobs)     p.close()      for i in range(len(jobs)):         print q.get()         print results.next()  if __name__ == '__main__':     main() 
like image 173
Olson Avatar answered Sep 21 '22 23:09

Olson