Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python multiprocessing pool retries

Is there a way to re-send a piece of data for processing, if the original computation failed, using a simple pool?

import random
from multiprocessing import Pool

def f(x):
   if random.getrandbits(1):
       raise ValueError("Retry this computation")
   return x*x

p = Pool(5)
# If one of these f(x) calls fails, retry it with another (or same) process
p.map(f, [1,2,3])
like image 687
atp Avatar asked Jul 18 '12 02:07

atp


People also ask

How many processes should be running Python multiprocessing?

If we are using the context manager to create the process pool so that it is automatically shutdown, then you can configure the number of processes in the same manner. The number of workers must be less than or equal to 61 if Windows is your operating system.

What is multiprocessing Freeze_support?

multiprocessing. freeze_support() This function will allow a frozen program to create and start new processes via the multiprocessing. Process class when the program is frozen for distribution on Windows. If the function is called and the program is not frozen for distribution, then it has no effect.

How do you pass multiple arguments in multiprocessing Python?

Use Pool. The multiprocessing pool starmap() function will call the target function with multiple arguments. As such it can be used instead of the map() function. This is probably the preferred approach for executing a target function in the multiprocessing pool that takes multiple arguments.

Is multiprocessing faster than multithreading?

Multiprocessing outshines threading in cases where the program is CPU intensive and doesn't have to do any IO or user interaction. For example, any program that just crunches numbers will see a massive speedup from multiprocessing; in fact, threading will probably slow it down.


1 Answers

You can use a Queue to feed back failures into the Pool through a loop in the initiating Process:

import multiprocessing as mp
import random

def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    total_items = len(pending)
    successful = []
    failure_tracker = []

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, pending)
    retry_results = []
    while len(successful) < total_items:
        successful.extend([r for r in results if not r is None])
        successful.extend([r for r in retry_results if not r is None])
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        if failed_items:
            failure_tracker.append(failed_items)
            retry_results = p.imap(f, failed_items);
    p.close()
    p.join()

    print "Results: %s" % successful
    print "Failures: %s" % failure_tracker

if __name__ == '__main__':
    main(range(1, 10))

The output is like this:

Results: [1, 4, 36, 49, 25, 81, 16, 64, 9]
Failures: [[3, 4, 5, 8, 9], [3, 8, 4], [8, 3], []]

A Pool cant be shared between multiple processes. Hence this Queue based approach. If you try to pass a pool as a parameter to the pools processes, you will get this error:

NotImplementedError: pool objects cannot be passed between processes or pickled

You could alternatively try a few immediate retries within your function f, to avoid synchronisation overhead. It really is a matter of how soon your function should wait to retry, and on how likely a success is if retried immediately.


Old Answer: For the sake of completeness, here is my old answer, which isn't as optimal as resubmitting directly into the pool, but might still be relevant depending on the use case, because it provides a natural way to deal with/limit n-level retries:

You can use a Queue to aggregate failures and resubmit at the end of each run, over multiple runs:

import multiprocessing as mp
import random


def f(x):
    if random.getrandbits(1):
        # on failure / exception catch
        f.q.put(x)
        return None
    return x*x

def f_init(q):
    f.q = q

def main(pending):
    run_number = 1
    while pending:
        jobs = pending
        pending = []

        q = mp.Queue()
        p = mp.Pool(None, f_init, [q])
        results = p.imap(f, jobs)
        p.close()

        p.join()
        failed_items = []
        while not q.empty():
            failed_items.append(q.get())
        successful = [r for r in results if not r is None]
        print "(%d) Succeeded: %s" % (run_number, successful)
        print "(%d) Failed:    %s" % (run_number, failed_items)
        print
        pending = failed_items
        run_number += 1

if __name__ == '__main__':
    main(range(1, 10))

with output like this:

(1) Succeeded: [9, 16, 36, 81]
(1) Failed:    [2, 1, 5, 7, 8]

(2) Succeeded: [64]
(2) Failed:    [2, 1, 5, 7]

(3) Succeeded: [1, 25]
(3) Failed:    [2, 7]

(4) Succeeded: [49]
(4) Failed:    [2]

(5) Succeeded: [4]
(5) Failed:    []
like image 194
Preet Kukreti Avatar answered Sep 28 '22 04:09

Preet Kukreti