mp.set_start_method('spawn')
total_count = Counter(0)
pool = mp.Pool(initializer=init, initargs=(total_count,), processes=num_proc)
pool.map(part_crack_helper, product(seed_str, repeat=4))
pool.close()
pool.join()
So I have a pool of worker process that does some work. It just needs to find one solution. Therefore, when one of the worker processes finds the solution, I want to stop everything.
One way I thought of was just calling sys.exit(). However, that doesn't seem like it's working properly since other processes are running.
One other way was to check for the return value of each process calls (the return value of part_crack_helper function) and call terminate on that process. However, I don't know how to do that when using that map function.
How should I achieve this?
You can use callbacks from Pool.apply_async
.
Something like this should do the job for you.
from multiprocessing import Pool
def part_crack_helper(args):
solution = do_job(args)
if solution:
return True
else:
return False
class Worker():
def __init__(self, workers, initializer, initargs):
self.pool = Pool(processes=workers,
initializer=initializer,
initargs=initargs)
def callback(self, result):
if result:
print("Solution found! Yay!")
self.pool.terminate()
def do_job(self):
for args in product(seed_str, repeat=4):
self.pool.apply_async(part_crack_helper,
args=args,
callback=self.callback)
self.pool.close()
self.pool.join()
print("good bye")
w = Worker(num_proc, init, [total_count])
w.do_job()
If you are ok with using another library, you could solve it the following way with Pebble. The advantage of this solution is that you can additionally specify a timeout. That means either the program ends if there is one successful worker or if it runs out of time:
from pebble import ProcessPool, ProcessExpired
from concurrent.futures import TimeoutError
import time
pool = ProcessPool()
def my_function(args):
print("running " + str(args))
time.sleep((args + 1) * 30)
print("process won:" + str(args))
return True
start_time = time.time()
future = pool.map(my_function, range(4), timeout=65)
iterator = future.result()
while True:
try:
result = next(iterator)
if result:
pool.stop()
pool.join(timeout=0)
break
except StopIteration:
break
except TimeoutError as error:
print("function took longer than %d seconds" % error.args[1])
except ProcessExpired as error:
print("%s. Exit code: %d" % (error, error.exitcode))
except Exception as error:
print("function raised %s" % error)
print(error.traceback) # Python's traceback of remote process
print("whole time: " + str(time.time() - start_time))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With