Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Print progress of pool.map_async

I have the following function

from multiprocessing import Pool
def do_comparison(tupl):
    x, y = tupl # unpack arguments
    return compare_clusters(x, y)

def distance_matrix(clusters, condensed=False):
    pool = Pool()
    values = pool.map_async(do_comparison, itertools.combinations(clusters, 2)).get()
    do stuff

Is it possible to print the progress of pool.map_async(do_comparison, itertools.combinations(clusters, 2)).get()? I tried it by adding a count to do_comparison like so

count = 0
def do_comparison(tupl):
    global count
    count += 1
    if count % 1000 == 0:
        print count
    x, y = tupl # unpack arguments
    return compare_clusters(x, y)

But aside from it not looking like a good solution, the numbers don't print until the end of the script. Is there a good way to do this?

like image 226
Niek de Klein Avatar asked Oct 24 '13 10:10

Niek de Klein


People also ask

How does Python check multiprocessing progress?

We can show progress of tasks in the process pool using the callback function. This can be achieved by issuing tasks asynchronously to the process pool, such as via the apply_async() function and specifying a callback function via the “callback” argument.

What is a process pool in Python?

Python multiprocessing Pool can be used for parallel execution of a function across multiple input values, distributing the input data across processes (data parallelism).

How do you get multiprocessing in Python?

In this example, at first we import the Process class then initiate Process object with the display() function. Then process is started with start() method and then complete the process with the join() method. We can also pass arguments to the function using args keyword.

What is the difference between MAP and iMAP?

An iMAP is specific to prices advertised online, but an eMAP includes all electronic communication channels—even text messaging. MAP pricing is broader than these two policies, and covers all print, physical, and digital channels a retailer may use to list pricing.


2 Answers

I track progress as follows:

import multiprocessing
import time

class PoolProgress:
  def __init__(self,pool,update_interval=3):
    self.pool            = pool
    self.update_interval = update_interval
  def track(self, job):
    task = self.pool._cache[job._job]
    while task._number_left>0:
      print("Tasks remaining = {0}".format(task._number_left*task._chunksize))
      time.sleep(self.update_interval)


def hi(x): #This must be defined before `p` if we are to use in the interpreter
  time.sleep(x//2)
  return x

a = list(range(50))

p   = multiprocessing.Pool()
pp  = PoolProgress(p)

res = p.map_async(hi,a)

pp.track(res)
like image 96
Richard Avatar answered Oct 24 '22 00:10

Richard


The solution from Richard works well with a low number of jobs, but for some reason, it seems to freeze at a very high number of jobs, I found best to use:

import multiprocessing
import time

def track_job(job, update_interval=3):
    while job._number_left > 0:
        print("Tasks remaining = {0}".format(
        job._number_left * job._chunksize))
        time.sleep(update_interval)



def hi(x): #This must be defined before `p` if we are to use in the interpreter
  time.sleep(x//2)
  return x

a = [x for x in range(50)]

p   = multiprocessing.Pool()

res = p.map_async(hi,a)

track_job(res)
like image 35
Lucas Pugens Fernandes Avatar answered Oct 24 '22 00:10

Lucas Pugens Fernandes