Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: How can I check the number of pending tasks in a multiprocessing.Pool?

I have a small pool of workers (4) and a very large list of tasks (5000~). I'm using a pool and sending the tasks with map_async(). Because the task I'm running is fairly long, I'm forcing a chunksize of 1 so that one long process can't hold up some shorter ones.

What I'd like to do is periodically check how many tasks are left to be submitted. I know at most 4 will be active, I'm concerned with how many are left to process.

I've googled around and I can't find anybody doing this.

Some simple code to help:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break
like image 311
jkeating Avatar asked Apr 04 '11 18:04

jkeating


People also ask

Is Imap_unordered faster than iMAP?

imap_unordered instead of pool. imap will not have a large effect on the total running time of your code. It might be a little faster, but not by too much. What it may do, however, is make the interval between values being available in your iteration more even.

How many processes can spawn in Python?

The maximum number of worker processes may be limited by your operating system. For example, on windows, you will not be able to create more than 61 child processes in your Python program.

How do you check if a process is alive in Python multiprocessing?

Python multiprocessing is_aliveThe is_alive method determines if the process is running. When we wait for the child process to finish with the join method, the process is already dead when we check it. If we comment out the join , the process is still alive.

What is the difference between pool and process in multiprocessing?

Pool is generally used for heterogeneous tasks, whereas multiprocessing. Process is generally used for homogeneous tasks. The Pool is designed to execute heterogeneous tasks, that is tasks that do not resemble each other. For example, each task submitted to the process pool may be a different target function.


2 Answers

You can check the number of pending jobs by seeing Pool._cache attribute assuming that you are using apply_async. This is where ApplyResult is stored until they are available and equals to the number of ApplyResults pending.

import multiprocessing as mp
import random
import time


def job():
    time.sleep(random.randint(1,10))
    print("job finished")

if __name__ == '__main__':
    pool = mp.Pool(5)
    for _ in range(10):
        pool.apply_async(job)

    while pool._cache:
        print("number of jobs pending: ", len(pool._cache))
        time.sleep(2)

    pool.close()
    pool.join()
like image 169
bombs Avatar answered Oct 05 '22 08:10

bombs


Looks like jobs._number_left is what you want. _ indicates that it is an internal value that may change at the whim of the developers, but it seems to be the only way to get that info.

like image 21
Brian C. Lane Avatar answered Oct 05 '22 06:10

Brian C. Lane