Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dead simple example of using Multiprocessing Queue, Pool and Locking

I tried to read the documentation at http://docs.python.org/dev/library/multiprocessing.html but I'm still struggling with multiprocessing Queue, Pool and Locking. And for now I was able to build the example below.

Regarding Queue and Pool, I'm not sure if I understood the concept in the right way, so correct me if I'm wrong. What I'm trying to achieve is to process 2 requests at time ( data list have 8 in this example ) so, what should I use? Pool to create 2 processes that can handle two different queues ( 2 at max ) or should I just use Queue to process 2 inputs each time? The lock would be to print the outputs correctly.

import multiprocessing import time  data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],         ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'] )   def mp_handler(var1):     for indata in var1:         p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))         p.start()   def mp_worker(inputs, the_time):     print " Processs %s\tWaiting %s seconds" % (inputs, the_time)     time.sleep(int(the_time))     print " Process %s\tDONE" % inputs  if __name__ == '__main__':     mp_handler(data) 
like image 311
thclpr Avatar asked Jan 02 '14 16:01

thclpr


People also ask

When would you use a multiprocessing pool?

Understand multiprocessing in no more than 6 minutes Multiprocessing is quintessential when a long-running process has to be speeded up or multiple processes have to execute parallelly. Executing a process on a single core confines its capability, which could otherwise spread its tentacles across multiple cores.

What is the difference between pool and process in multiprocessing?

While the Process keeps all the processes in the memory, the Pool keeps only those that are under execution. Therefore, if you have a large number of tasks, and if they have more data and take a lot of space too, then using process class might waste a lot of memory. The overhead of creating a Pool is more.

What is a multiprocessing queue?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.


1 Answers

The best solution for your problem is to utilize a Pool. Using Queues and having a separate "queue feeding" functionality is probably overkill.

Here's a slightly rearranged version of your program, this time with only 2 processes coralled in a Pool. I believe it's the easiest way to go, with minimal changes to original code:

import multiprocessing import time  data = (     ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],     ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7'] )  def mp_worker((inputs, the_time)):     print " Processs %s\tWaiting %s seconds" % (inputs, the_time)     time.sleep(int(the_time))     print " Process %s\tDONE" % inputs  def mp_handler():     p = multiprocessing.Pool(2)     p.map(mp_worker, data)  if __name__ == '__main__':     mp_handler() 

Note that mp_worker() function now accepts a single argument (a tuple of the two previous arguments) because the map() function chunks up your input data into sublists, each sublist given as a single argument to your worker function.

Output:

Processs a  Waiting 2 seconds Processs b  Waiting 4 seconds Process a   DONE Processs c  Waiting 6 seconds Process b   DONE Processs d  Waiting 8 seconds Process c   DONE Processs e  Waiting 1 seconds Process e   DONE Processs f  Waiting 3 seconds Process d   DONE Processs g  Waiting 5 seconds Process f   DONE Processs h  Waiting 7 seconds Process g   DONE Process h   DONE 

Edit as per @Thales comment below:

If you want "a lock for each pool limit" so that your processes run in tandem pairs, ala:

A waiting B waiting | A done , B done | C waiting , D waiting | C done, D done | ...

then change the handler function to launch pools (of 2 processes) for each pair of data:

def mp_handler():     subdata = zip(data[0::2], data[1::2])     for task1, task2 in subdata:         p = multiprocessing.Pool(2)         p.map(mp_worker, (task1, task2)) 

Now your output is:

 Processs a Waiting 2 seconds  Processs b Waiting 4 seconds  Process a  DONE  Process b  DONE  Processs c Waiting 6 seconds  Processs d Waiting 8 seconds  Process c  DONE  Process d  DONE  Processs e Waiting 1 seconds  Processs f Waiting 3 seconds  Process e  DONE  Process f  DONE  Processs g Waiting 5 seconds  Processs h Waiting 7 seconds  Process g  DONE  Process h  DONE 
like image 72
Velimir Mlaker Avatar answered Oct 07 '22 19:10

Velimir Mlaker