Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing and a shared counter

I am having troubles with the multiprocessing module. I am using a Pool of workers with its map method to concurrently analyze lots of files. Each time a file has been processed I would like to have a counter updated so that I can keep track of how many files remains to be processed. Here is sample code:

import os import multiprocessing  counter = 0   def analyze(file):     # Analyze the file.     global counter     counter += 1     print counter   if __name__ == '__main__':     files = os.listdir('/some/directory')     pool = multiprocessing.Pool(4)     pool.map(analyze, files) 

I cannot find a solution for this.

like image 356
Davide Avatar asked Jan 17 '10 10:01

Davide


1 Answers

The problem is that the counter variable is not shared between your processes: each separate process is creating it's own local instance and incrementing that.

See this section of the documentation for some techniques you can employ to share state between your processes. In your case you might want to share a Value instance between your workers

Here's a working version of your example (with some dummy input data). Note it uses global values which I would really try to avoid in practice:

from multiprocessing import Pool, Value from time import sleep  counter = None  def init(args):     ''' store the counter for later use '''     global counter     counter = args  def analyze_data(args):     ''' increment the global counter, do something with the input '''     global counter     # += operation is not atomic, so we need to get a lock:     with counter.get_lock():         counter.value += 1     print counter.value     return args * 10  if __name__ == '__main__':     #inputs = os.listdir(some_directory)      #     # initialize a cross-process counter and the input lists     #     counter = Value('i', 0)     inputs = [1, 2, 3, 4]      #     # create the pool of workers, ensuring each one receives the counter      # as it starts.      #     p = Pool(initializer = init, initargs = (counter, ))     i = p.map_async(analyze_data, inputs, chunksize = 1)     i.wait()     print i.get() 
like image 67
jkp Avatar answered Sep 25 '22 15:09

jkp