I need to iterate over a huge numpy array to build three lists, depending of result of an expensive C library call, which accepts scalar values and cannot be vectorized (or at least I don't see how to do it). This loop can take from few hours up to a couple of days and I can see that performances degrade with time (I log the progress and I can see is much slower towards the end) maybe due the growing sizes of lists (??).
Code look like this (I omitted code related to print progress and some micro optimizations):
import numpy as np
import swig_c_lib
def build_indexes(large_numpy_array_1, large_numpy_array_2):
xs = []
ys = []
idxs = []
for (x, y), value in np.ndenumerate(large_numpy_array_1):
if not (value <= -1.0e+10):
try:
index = swig_c_lib.slow_computation(np.asscalar(large_numpy_array_2[x, y]), np.asscalar(large_numpy_array_1[x, y]))
except swig_lib.InternalError:
pass
else:
xs.append(x)
ys.append(y)
idxs.append(index)
return np.asarray(xs), np.asarray(ys), np.asarray(idxs)
Probably, one solution can be to split large input numpy arrays into 4 subarrays and use multiprocessing (but then I'm not sure how to merge results). Anyone who can help here?
This is a problem where the dask module can help
Let's start by creating two arrays a1 and a2. They can have an arbitrary shape, but for this example we will make them n by n, where n=30. We flatten the arrays and stack them together, to form a single big array of shape (2,900). Each pair along the axis=1 dimension is a pair of items at the same position on a1 and a2:
In[1]:
import numpy as np
n = 30
a1 = np.random.rand(n, n)
a2 = np.random.rand(n, n)
a = np.stack((a1.flat, a2.flat))
a.shape
Out[1]:
(2, 900)
We then proceed to split the array in chunks. We choose 250 chunks:
In[2]:
chunks = np.array_split(a, 250, axis=1)
len(chunks)
Out[2]:
250
In[3]:
chunks[0]
Out[3]:
array([[ 0.54631022, 0.8428954 , 0.11835531, 0.59720379],
[ 0.51184696, 0.64365038, 0.74471553, 0.67035977]])
We will now define a slow_function, which will play the role of the slow computation that is described in the question. We also define a way to use numpy to apply the slow function on one of the chunks.
In[4]:
def slow_function(pair):
return np.asscalar(pair[0]) + np.asscalar(pair[1])
def apply_on_chunk(chunk):
return np.apply_along_axis(slow_function, 0, chunk)
apply_on_chunk(chunks[0])
Out[4]:
array([ 1.05815717, 1.48654578, 0.86307085, 1.26756356])
In the above, note that apply_on_chunk works regardless of the size of the axis=1 in the chunk. In other words we could just as well go ahead and call apply_on_chunk(a) to calculate the result over the entire initial array.
dask.bagWe now show how to parallelize the computation along the chunks using the map method of a dask.bag object:
In[5]:
import dask.bag as db
mybag = db.from_sequence(chunks)
In[6]:
%time myresult = mybag.map(apply_on_chunk)
Out[6]:
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.62 ms
At this point we have not yet calculated anything. We have described to dask how it is that we want our result to be calculated. This step happens relatively quickly, in approx 1.6 ms
To go ahead and trigger the actual computation, we call the compute method on myresult:
In[7]:
%time myresult = myresult.compute()
Out[7]:
CPU times: user 256 ms, sys: 24 ms, total: 280 ms
Wall time: 362 ms
The above takes a little more than 1/3 of a second to run. What we obtain is a list of arrays, corresponding to the result of apply_on_chunk called over each element in the dask.bag. We inspect the first five of these:
In[8]:
myresult[:5]
Out[8]:
[array([ 1.05815717, 1.48654578, 0.86307085, 1.26756356]),
array([ 1.48913909, 1.25028145, 1.36707112, 1.04826167]),
array([ 0.90069768, 1.24921559, 1.23146726, 0.84963409]),
array([ 0.72292347, 0.87069598, 1.35893143, 1.02451637]),
array([ 1.16422966, 1.35559156, 0.9071381 , 1.17786002])]
If we really want the result in final form we have to call np.concatenate to put together the results for all of the chunks. We do that below, and show the performance of the computation:
In[9]:
%time myresult = np.concatenate(\
db.from_sequence(\
np.array_split(np.stack((a1.flat, a2.flat)), 250, axis=1)\
).map(apply_on_chunk).compute())
Out[9]:
CPU times: user 232 ms, sys: 40 ms, total: 272 ms
Wall time: 342 ms
The full calculation, which gives us a single array with the results takes a little more than 1/3 of a second to run.
What if we just go ahead and do the calculation straight up on the entire array, without using any parallelization:
In[10]:
%time myresult_ = np.apply_along_axis(slow_function, 0, np.stack((a1.flat, a2.flat)))
Out[10]:
CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 12.9 ms
The straight up calculation is a lot faster. But the reason for this is that slow_function is not really that slow at the moment. It is just the summation of two elements, which does not take much time at all. The sluggishness that we are seeing in the dask.bag calculation is the overhead arising from parallelization.
Let's go ahead and try again, but this time with a truly slow function, one that takes approximately 20 ms per call:
In[11]:
n = 30
a1 = np.random.rand(n, n)
a2 = np.random.rand(n, n)
import time
def slow_function(pair):
time.sleep(0.02)
return np.asscalar(pair[0]) + np.asscalar(pair[1])
def apply_on_chunk(chunk):
return np.apply_along_axis(slow_function, 0, chunk)
Let's compare what dask can do versus just running the calculation straight up on the entire array:
In[12]:
%time myresult = np.concatenate(\
db.from_sequence(\
np.array_split(np.stack((a1.flat, a2.flat)), 250, axis=1)\
).map(apply_on_chunk).compute())
Out[12]:
CPU times: user 236 ms, sys: 20 ms, total: 256 ms
Wall time: 4.75 s
In[13]:
%time myresult_ = np.apply_along_axis(slow_function, 0, np.stack((a1.flat, a2.flat)))
Out[13]:
CPU times: user 72 ms, sys: 16 ms, total: 88 ms
Wall time: 18.2 s
As can be seen, dask is leveraging multiprocessing to speed up the computation. We get about a factor of 4 speed up.
For completeness, we show that the results of dask and the straight up calculation agree with each other:
In[14]:
np.testing.assert_array_equal(myresult, myresult_)
print("success")
Out[14]:
success
Note that the function in the question returns a tuple
np.asarray(xs), np.asarray(ys), np.asarray(idxs)
What we have described involves only the calculation of np.asarray(idxs). The first two items in the returned tuple can be readily obtained if one knows the shape of the original a1 and a2.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With