Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing - tracking the process of pool.map operation

I have a function which performs some simulation and returns an array in string format.

I want to run the simulation (the function) for varying input parameter values, over 10000 possible input values, and write the results to a single file.

I am using multiprocessing, specifically, pool.map function to run the simulations in parallel.

Since the whole process of running the simulation function over 10000 times takes a very long time, I really would like to track the process of the entire operation.

I think the problem in my current code below is that, pool.map runs the function 10000 times, without any process tracking during those operations. Once the parallel processing finishes running 10000 simulations (could be hours to days.), then I keep tracking when 10000 simulation results are being saved to a file..So this is not really tracking the processing of pool.map operation.

Is there an easy fix to my code that will allow process tracking?

def simFunction(input):
    # Does some simulation and outputs simResult
    return str(simResult)

# Parallel processing

inputs = np.arange(0,10000,1)

if __name__ == "__main__":
    numCores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes = numCores)
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out:
        print("Starting to simulate " + str(len(inputs)) + " input values...")
        counter = 0
        for i in t:
            out.write(i + '\n')
            counter = counter + 1
            if counter%100==0:
                print(str(counter) + " of " + str(len(inputs)) + " input values simulated")
    print('Finished!!!!')
like image 636
user32147 Avatar asked Feb 06 '15 21:02

user32147


People also ask

How do processes pools work in multiprocessing?

Pool is generally used for heterogeneous tasks, whereas multiprocessing. Process is generally used for homogeneous tasks. The Pool is designed to execute heterogeneous tasks, that is tasks that do not resemble each other. For example, each task submitted to the process pool may be a different target function.

What is multiprocessing pool map?

The multiprocessing. pool. Pool process pool provides a version of the map() function where the target function is called for each item in the provided iterable in parallel. A parallel equivalent of the map() built-in function […]. It blocks until the result is ready.

How does pool map work Python?

The pool's map method chops the given iterable into a number of chunks which it submits to the process pool as separate tasks. The pool's map is a parallel equivalent of the built-in map method. The map blocks the main execution until all computations finish. The Pool can take the number of processes as a parameter.

What does multiprocessing pool do in Python?

Python multiprocessing Pool can be used for parallel execution of a function across multiple input values, distributing the input data across processes (data parallelism).


2 Answers

There is no "easy fix". map is all about hiding implementation details from you. And in this case you want details. That is, things become a little more complex, by definition. You need to change the communication paradigm. There are many ways to do so.

One is: create a Queue for collecting your results, and let your workers put results into this queue. You can then, from within a monitoring thread or process, look at the queue, and consume the results as they are coming in. While consuming, you can analyze them, and generate log output. This might be the most general way to keep track of progress: you can respond to incoming results in any way, in real time.

A more simple way might be to slightly modify your worker function, and generate log output in there. By carefully analyzing the log output with external tools (such as grep and wc), you can come up with very simple means of keeping track.

like image 98
Dr. Jan-Philip Gehrcke Avatar answered Oct 02 '22 18:10

Dr. Jan-Philip Gehrcke


Note that I'm using pathos.multiprocessing instead of multiprocessing. It's just a fork of multiprocessing that enables you to do map functions with multiple inputs, has much better serialization, and allows you to execute map calls anywhere (not just in __main__). You could use multiprocessing to do the below as well, however the code would be very slightly different.

If you use an iterated map function, it's pretty easy to keep track of progress.

from pathos.multiprocessing import ProcessingPool as Pool
def simFunction(x,y):
  import time
  time.sleep(2)
  return x**2 + y
 
x,y = range(100),range(-100,100,2)
res = Pool().imap(simFunction, x,y)
with open('results.txt', 'w') as out:
  for i in x:
    out.write("%s\n" % res.next())
    if i%10 is 0:
      print "%s of %s simulated" % (i, len(x))
0 of 100 simulated
10 of 100 simulated
20 of 100 simulated
30 of 100 simulated
40 of 100 simulated
50 of 100 simulated
60 of 100 simulated
70 of 100 simulated
80 of 100 simulated
90 of 100 simulated

Or, you can use an asynchronous map. Here I'll do things a little differently, just to mix it up.

import time
res = Pool().amap(simFunction, x,y)
while not res.ready():
  print "waiting..."
  time.sleep(5)
 
waiting...
waiting...
waiting...
waiting...
res.get()
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899]

Either an iterated or asynchronous map will enable you to write whatever code you want to do better process tracking. For example, pass a unique "id" to each job, and watch which come back, or have each job return it's process id. There are lots of ways to track progress and processes… but the above should give you a start.

You can get pathos here.

like image 31
Mike McKerns Avatar answered Oct 02 '22 19:10

Mike McKerns