Context
I have a function that produces a large 2D numpy array (with fixed shape) as output. I am calling this function 1000 times using joblib (Parallel with a multiprocessing backend) on 8 CPUs. At the end of the job, I add up all the arrays element-wise (using np.sum) to produce a single 2D array that I am interested in. However, when I attempt this, I run out of RAM. I assume that this is because the 1000 arrays would need to be stored in RAM until they are summed at the end.
Question
Is there a way to get each worker to add up its arrays as it goes? For example, worker 1 would add array 2 to array 1, and then discard array 2 before computing array 3, and so on. This way, there would only be a maximum of 8 arrays (for 8 CPUs) stored in RAM at any point in time, and these could be summed up at the end to get the same answer.
The facts that you know your arguments in advance and the time for calculation not varying much with the actual argument(s) simplifies the task. It allows for assigning complete jobs for every worker process at start and just summing up the results at the end, just how you proposed.
In the code below every spawned process gets an "equal" (as much as possible) part of all arguments (its args_batch) and sums up the intermediate results from calling the target function in it's own result-array. These arrays get summed up finally by the parent process.
The "delayed" function here in the example is not the target function which calculates an array, but a processing function (worker) to which the target function (calc_array) gets passed as part of the job along with the batch of arguments.
import numpy as np
from itertools import repeat
from time import sleep
from joblib import Parallel, delayed
def calc_array(v):
"""Create an array with specified shape and
fill it up with value v, then kill some time.
Dummy target function.
"""
new_array = np.full(shape=SHAPE, fill_value=v)
# delay result:
cnt = 10_000_000
for _ in range(cnt):
cnt -= 1
return new_array
def worker(func, args_batch):
"""Call func with every packet of arguments received and update
result array on the run.
Worker function which runs the job in each spawned process.
"""
results = np.zeros(SHAPE)
for args_ in args_batch:
new_array = func(*args_)
np.sum([results, new_array], axis=0, out=results)
return results
def main(func, arguments, n_jobs, verbose):
with Parallel(n_jobs=n_jobs, verbose=verbose) as parallel:
# bundle up jobs:
funcs = repeat(func, n_jobs) # functools.partial seems not pickle-able
args_batches = np.array_split(arguments, n_jobs, axis=0)
jobs = zip(funcs, args_batches)
result = sum(parallel(delayed(worker)(*job) for job in jobs))
assert np.all(result == sum(range(CALLS_TOTAL)))
sleep(1) # just to keep stdout ordered
print(result)
if __name__ == '__main__':
SHAPE = (4, 4) # shape of array calculated by calc_array
N_JOBS = 8
CALLS_TOTAL = 100
VERBOSE = 10
ARGUMENTS = np.asarray([*zip(range(CALLS_TOTAL))])
# array([[0], [1], [2], ...]])
# zip to bundle arguments in a container so we have less code to
# adapt when feeding a function with multiple parameters
main(func=calc_array, arguments=ARGUMENTS, n_jobs=N_JOBS, verbose=VERBOSE)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With