Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Joblib Parallel: How to combine results per worker?

Context

I have a function that produces a large 2D numpy array (with fixed shape) as output. I am calling this function 1000 times using joblib (Parallel with a multiprocessing backend) on 8 CPUs. At the end of the job, I add up all the arrays element-wise (using np.sum) to produce a single 2D array that I am interested in. However, when I attempt this, I run out of RAM. I assume that this is because the 1000 arrays would need to be stored in RAM until they are summed at the end.

Question

Is there a way to get each worker to add up its arrays as it goes? For example, worker 1 would add array 2 to array 1, and then discard array 2 before computing array 3, and so on. This way, there would only be a maximum of 8 arrays (for 8 CPUs) stored in RAM at any point in time, and these could be summed up at the end to get the same answer.

like image 645
Miguel Avatar asked Jun 30 '26 11:06

Miguel


1 Answers

The facts that you know your arguments in advance and the time for calculation not varying much with the actual argument(s) simplifies the task. It allows for assigning complete jobs for every worker process at start and just summing up the results at the end, just how you proposed.

In the code below every spawned process gets an "equal" (as much as possible) part of all arguments (its args_batch) and sums up the intermediate results from calling the target function in it's own result-array. These arrays get summed up finally by the parent process.

The "delayed" function here in the example is not the target function which calculates an array, but a processing function (worker) to which the target function (calc_array) gets passed as part of the job along with the batch of arguments.

import numpy as np
from itertools import repeat
from time import sleep
from joblib import Parallel, delayed


def calc_array(v):
    """Create an array with specified shape and
    fill it up with value v, then kill some time.

    Dummy target function.
    """
    new_array = np.full(shape=SHAPE, fill_value=v)
    # delay result:
    cnt = 10_000_000
    for _ in range(cnt):
        cnt -= 1

    return new_array


def worker(func, args_batch):
    """Call func with every packet of arguments received and update
    result array on the run.

    Worker function which runs the job in each spawned process.
    """
    results = np.zeros(SHAPE)
    for args_ in args_batch:
        new_array = func(*args_)
        np.sum([results, new_array], axis=0, out=results)

    return results


def main(func, arguments, n_jobs, verbose):

    with Parallel(n_jobs=n_jobs, verbose=verbose) as parallel:

        # bundle up jobs:
        funcs = repeat(func, n_jobs)  # functools.partial seems not pickle-able
        args_batches = np.array_split(arguments, n_jobs, axis=0)
        jobs = zip(funcs, args_batches)

        result = sum(parallel(delayed(worker)(*job) for job in jobs))
        assert np.all(result == sum(range(CALLS_TOTAL)))

    sleep(1)  # just to keep stdout ordered
    print(result)


if __name__ == '__main__':

    SHAPE = (4, 4)  # shape of array calculated by calc_array
    N_JOBS = 8
    CALLS_TOTAL = 100
    VERBOSE = 10
    ARGUMENTS = np.asarray([*zip(range(CALLS_TOTAL))])
    # array([[0], [1], [2], ...]])
    # zip to bundle arguments in a container so we have less code to
    # adapt when feeding a function with multiple parameters

    main(func=calc_array, arguments=ARGUMENTS, n_jobs=N_JOBS, verbose=VERBOSE)
like image 52
Darkonaut Avatar answered Jul 02 '26 23:07

Darkonaut