I am having troubles with the multiprocessing
module. I am using a Pool
of workers with its map
method to concurrently analyze lots of files. Each time a file has been processed I would like to have a counter updated so that I can keep track of how many files remains to be processed. Here is sample code:
import os import multiprocessing counter = 0 def analyze(file): # Analyze the file. global counter counter += 1 print counter if __name__ == '__main__': files = os.listdir('/some/directory') pool = multiprocessing.Pool(4) pool.map(analyze, files)
I cannot find a solution for this.
The problem is that the counter
variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that.
See this section of the documentation for some techniques you can employ to share state between your processes. In your case you might want to share a Value
instance between your workers
Here's a working version of your example (with some dummy input data). Note it uses global values which I would really try to avoid in practice:
from multiprocessing import Pool, Value from time import sleep counter = None def init(args): ''' store the counter for later use ''' global counter counter = args def analyze_data(args): ''' increment the global counter, do something with the input ''' global counter # += operation is not atomic, so we need to get a lock: with counter.get_lock(): counter.value += 1 print counter.value return args * 10 if __name__ == '__main__': #inputs = os.listdir(some_directory) # # initialize a cross-process counter and the input lists # counter = Value('i', 0) inputs = [1, 2, 3, 4] # # create the pool of workers, ensuring each one receives the counter # as it starts. # p = Pool(initializer = init, initargs = (counter, )) i = p.map_async(analyze_data, inputs, chunksize = 1) i.wait() print i.get()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With