Is there a way to re-send a piece of data for processing, if the original computation failed, using a simple pool?
import random
from multiprocessing import Pool
def f(x):
if random.getrandbits(1):
raise ValueError("Retry this computation")
return x*x
p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
If we are using the context manager to create the process pool so that it is automatically shutdown, then you can configure the number of processes in the same manner. The number of workers must be less than or equal to 61 if Windows is your operating system.
multiprocessing. freeze_support() This function will allow a frozen program to create and start new processes via the multiprocessing. Process class when the program is frozen for distribution on Windows. If the function is called and the program is not frozen for distribution, then it has no effect.
Use Pool. The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.
Multiprocessing outshines threading in cases where the program is CPU intensive and doesn't have to do any IO or user interaction. For example, any program that just crunches numbers will see a massive speedup from multiprocessing; in fact, threading will probably slow it down.
You can use a Queue
to feed back failures into the Pool
through a loop in the initiating Process
:
import multiprocessing as mp
import random
def f(x):
if random.getrandbits(1):
# on failure / exception catch
f.q.put(x)
return None
return x*x
def f_init(q):
f.q = q
def main(pending):
total_items = len(pending)
successful = []
failure_tracker = []
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, pending)
retry_results = []
while len(successful) < total_items:
successful.extend([r for r in results if not r is None])
successful.extend([r for r in retry_results if not r is None])
failed_items = []
while not q.empty():
failed_items.append(q.get())
if failed_items:
failure_tracker.append(failed_items)
retry_results = p.imap(f, failed_items);
p.close()
p.join()
print "Results: %s" % successful
print "Failures: %s" % failure_tracker
if __name__ == '__main__':
main(range(1, 10))
The output is like this:
Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]
A Pool
cant be shared between multiple processes. Hence this Queue
based approach. If you try to pass a pool as a parameter to the pools processes, you will get this error:
NotImplementedError: pool objects cannot be passed between processes or pickled
You could alternatively try a few immediate retries within your function f
, to avoid synchronisation overhead. It really is a matter of how soon your function should wait to retry, and on how likely a success is if retried immediately.
Old Answer: For the sake of completeness, here is my old answer, which isn't as optimal as resubmitting directly into the pool, but might still be relevant depending on the use case, because it provides a natural way to deal with/limit n
-level retries:
You can use a Queue
to aggregate failures and resubmit at the end of each run, over multiple runs:
import multiprocessing as mp
import random
def f(x):
if random.getrandbits(1):
# on failure / exception catch
f.q.put(x)
return None
return x*x
def f_init(q):
f.q = q
def main(pending):
run_number = 1
while pending:
jobs = pending
pending = []
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
p.join()
failed_items = []
while not q.empty():
failed_items.append(q.get())
successful = [r for r in results if not r is None]
print "(%d) Succeeded: %s" % (run_number, successful)
print "(%d) Failed: %s" % (run_number, failed_items)
print
pending = failed_items
run_number += 1
if __name__ == '__main__':
main(range(1, 10))
with output like this:
(1) Succeeded: [9, 16, 36, 81]
(1) Failed: [2, 1, 5, 7, 8]
(2) Succeeded: [64]
(2) Failed: [2, 1, 5, 7]
(3) Succeeded: [1, 25]
(3) Failed: [2, 7]
(4) Succeeded: [49]
(4) Failed: [2]
(5) Succeeded: [4]
(5) Failed: []
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With