Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Multiprocess Pool. How to exit the script when one of the worker process determines no more work needs to be done?

mp.set_start_method('spawn')
total_count = Counter(0)
pool = mp.Pool(initializer=init, initargs=(total_count,), processes=num_proc)    

pool.map(part_crack_helper, product(seed_str, repeat=4))
pool.close()
pool.join()

So I have a pool of worker process that does some work. It just needs to find one solution. Therefore, when one of the worker processes finds the solution, I want to stop everything.

One way I thought of was just calling sys.exit(). However, that doesn't seem like it's working properly since other processes are running.

One other way was to check for the return value of each process calls (the return value of part_crack_helper function) and call terminate on that process. However, I don't know how to do that when using that map function.

How should I achieve this?

like image 253
whiteSkar Avatar asked Oct 31 '15 01:10

whiteSkar


2 Answers

You can use callbacks from Pool.apply_async.

Something like this should do the job for you.

from multiprocessing import Pool


def part_crack_helper(args):
    solution = do_job(args)
    if solution:
        return True
    else:
        return False


class Worker():
    def __init__(self, workers, initializer, initargs):
        self.pool = Pool(processes=workers, 
                         initializer=initializer, 
                         initargs=initargs)

    def callback(self, result):
        if result:
            print("Solution found! Yay!")
            self.pool.terminate()

    def do_job(self):
        for args in product(seed_str, repeat=4):
            self.pool.apply_async(part_crack_helper, 
                                  args=args, 
                                  callback=self.callback)

        self.pool.close()
        self.pool.join()
        print("good bye")


w = Worker(num_proc, init, [total_count])
w.do_job()
like image 193
noxdafox Avatar answered Sep 22 '22 13:09

noxdafox


If you are ok with using another library, you could solve it the following way with Pebble. The advantage of this solution is that you can additionally specify a timeout. That means either the program ends if there is one successful worker or if it runs out of time:

from pebble import ProcessPool, ProcessExpired
from concurrent.futures import TimeoutError
import time

pool = ProcessPool()

def my_function(args):
    print("running " + str(args))
    time.sleep((args + 1) * 30)
    print("process won:" + str(args))
    return True


start_time = time.time()

future = pool.map(my_function, range(4), timeout=65)
iterator = future.result()

while True:
    try:
        result = next(iterator)
        if result:
            pool.stop()
            pool.join(timeout=0)
            break
    except StopIteration:
        break
    except TimeoutError as error:
        print("function took longer than %d seconds" % error.args[1])
    except ProcessExpired as error:
        print("%s. Exit code: %d" % (error, error.exitcode))
    except Exception as error:
        print("function raised %s" % error)
        print(error.traceback)  # Python's traceback of remote process

print("whole time: " + str(time.time() - start_time))
like image 37
Felix Avatar answered Sep 21 '22 13:09

Felix