I am trying to use a worker Pool in python using Process objects. Each worker (a Process) does some initialization (takes a non-trivial amount of time), gets passed a series of jobs (ideally using map()
), and returns something. No communication is necessary beyond that. However, I can't seem to figure out how to use map() to use my worker's compute()
function.
from multiprocessing import Pool, Process
class Worker(Process):
def __init__(self):
print 'Worker started'
# do some initialization here
super(Worker, self).__init__()
def compute(self, data):
print 'Computing things!'
return data * data
if __name__ == '__main__':
# This works fine
worker = Worker()
print worker.compute(3)
# workers get initialized fine
pool = Pool(processes = 4,
initializer = Worker)
data = range(10)
# How to use my worker pool?
result = pool.map(compute, data)
Is a job queue the way to go instead, or can I use map()
?
A worker is a Python process that typically runs in the background and exists solely as a work horse to perform lengthy or blocking tasks that you don't want to perform inside web processes.
In this example, at first we import the Process class then initiate Process object with the display() function. Then process is started with start() method and then complete the process with the join() method. We can also pass arguments to the function using args keyword.
Java Concurrency & Multithreading Complete Course Process pool can be defined as the group of pre-instantiated and idle processes, which stand ready to be given work. Creating process pool is preferred over instantiating new processes for every task when we need to do a large number of tasks.
As we have seen, the Pool allocates only executing processes in memory and the process allocates all the tasks in memory, so when the task number is small, we can use process class and when the task number is large, we can use the pool.
I would suggest that you use a Queue for this.
class Worker(Process): def __init__(self, queue): super(Worker, self).__init__() self.queue = queue def run(self): print('Worker started') # do some initialization here print('Computing things!') for data in iter(self.queue.get, None): # Use data
Now you can start a pile of these, all getting work from a single queue
request_queue = Queue() for i in range(4): Worker(request_queue).start() for data in the_real_source: request_queue.put(data) # Sentinel objects to allow clean shutdown: 1 per worker. for i in range(4): request_queue.put(None)
That kind of thing should allow you to amortize the expensive startup cost across multiple workers.
initializer
expects an arbitrary callable that does initilization e.g., it can set some globals, not a Process
subclass; map
accepts an arbitrary iterable:
#!/usr/bin/env python import multiprocessing as mp def init(val): print('do some initialization here') def compute(data): print('Computing things!') return data * data def produce_data(): yield -100 for i in range(10): yield i yield 100 if __name__=="__main__": p = mp.Pool(initializer=init, initargs=('arg',)) print(p.map(compute, produce_data()))
Since python 3.3 you can use starmap, also for using multiple arguments AND getting back the results in a very simplistic syntax:
import multiprocessing
nb_cores = multiprocessing.cpu_count()
def caps(nb, letter):
print('Exec nb:', nb)
return letter.upper()
if __name__ == '__main__':
multiprocessing.freeze_support() # for Windows, also requires to be in the statement: if __name__ == '__main__'
input_data = ['a','b','c','d','e','f','g','h']
input_order = [1,2,3,4,5,6,7,8,9]
with multiprocessing.Pool(processes=nb_cores) as pool: # auto closing workers
results = pool.starmap(caps, zip(input_order, input_data))
print(results)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With