Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python Pool with worker Processes

I am trying to use a worker Pool in python using Process objects. Each worker (a Process) does some initialization (takes a non-trivial amount of time), gets passed a series of jobs (ideally using map()), and returns something. No communication is necessary beyond that. However, I can't seem to figure out how to use map() to use my worker's compute() function.

from multiprocessing import Pool, Process

class Worker(Process):
    def __init__(self):
        print 'Worker started'
        # do some initialization here
        super(Worker, self).__init__()

    def compute(self, data):
        print 'Computing things!'
        return data * data

if __name__ == '__main__':
    # This works fine
    worker = Worker()
    print worker.compute(3)

    # workers get initialized fine
    pool = Pool(processes = 4,
                initializer = Worker)
    data = range(10)
    # How to use my worker pool?
    result = pool.map(compute, data)

Is a job queue the way to go instead, or can I use map()?

like image 426
Felix Avatar asked Jan 27 '12 19:01

Felix


People also ask

What is worker process in Python?

A worker is a Python process that typically runs in the background and exists solely as a work horse to perform lengthy or blocking tasks that you don't want to perform inside web processes.

How do I run multiple processes in Python?

In this example, at first we import the Process class then initiate Process object with the display() function. Then process is started with start() method and then complete the process with the join() method. We can also pass arguments to the function using args keyword.

What is process pool Python?

Java Concurrency & Multithreading Complete Course Process pool can be defined as the group of pre-instantiated and idle processes, which stand ready to be given work. Creating process pool is preferred over instantiating new processes for every task when we need to do a large number of tasks.

What is the difference between pool and process in Python?

As we have seen, the Pool allocates only executing processes in memory and the process allocates all the tasks in memory, so when the task number is small, we can use process class and when the task number is large, we can use the pool.


3 Answers

I would suggest that you use a Queue for this.

class Worker(Process):     def __init__(self, queue):         super(Worker, self).__init__()         self.queue = queue      def run(self):         print('Worker started')         # do some initialization here          print('Computing things!')         for data in iter(self.queue.get, None):             # Use data 

Now you can start a pile of these, all getting work from a single queue

request_queue = Queue() for i in range(4):     Worker(request_queue).start() for data in the_real_source:     request_queue.put(data) # Sentinel objects to allow clean shutdown: 1 per worker. for i in range(4):     request_queue.put(None)  

That kind of thing should allow you to amortize the expensive startup cost across multiple workers.

like image 171
S.Lott Avatar answered Oct 05 '22 23:10

S.Lott


initializer expects an arbitrary callable that does initilization e.g., it can set some globals, not a Process subclass; map accepts an arbitrary iterable:

#!/usr/bin/env python import multiprocessing as mp  def init(val):     print('do some initialization here')  def compute(data):     print('Computing things!')     return data * data  def produce_data():     yield -100     for i in range(10):         yield i     yield 100  if __name__=="__main__":   p = mp.Pool(initializer=init, initargs=('arg',))   print(p.map(compute, produce_data())) 
like image 36
jfs Avatar answered Oct 06 '22 01:10

jfs


Since python 3.3 you can use starmap, also for using multiple arguments AND getting back the results in a very simplistic syntax:

import multiprocessing

nb_cores = multiprocessing.cpu_count()

def caps(nb, letter):
    print('Exec nb:', nb)
    return letter.upper()

if __name__ == '__main__':

    multiprocessing.freeze_support() # for Windows, also requires to be in the statement: if __name__ == '__main__'

    input_data = ['a','b','c','d','e','f','g','h']
    input_order = [1,2,3,4,5,6,7,8,9]

    with multiprocessing.Pool(processes=nb_cores) as pool: # auto closing workers
        results = pool.starmap(caps, zip(input_order, input_data))

    print(results)
like image 35
Charly Empereur-mot Avatar answered Oct 06 '22 00:10

Charly Empereur-mot