Suppose that I have a NumPy array of integers.
arr = np.random.randint(0, 1000, 1000)
And I have two arrays lower
and upper
, which represent lower and upper bounds respectively on slices of arr
. These intervals are overlapping and variable-length, however lowers
and uppers
are both guaranteed to be non-decreasing.
lowers = np.array([0, 5, 132, 358, 566, 822])
uppers = np.array([45, 93, 189, 533, 800, 923])
I want to find the min and the max of each slice of arr
defined by lowers
and uppers
, and store these in another array.
out_arr = np.empty((lowers.size, 2))
What is the most efficient way to do this? I'm worried there is not a vectorized approach, as I can't see how I get around indexing in a loop..
My current approach is just the straightforward
for i in range(lowers.size):
arr_v = arr[lowers[i]:uppers[i]]
out_arr[i,0] = np.amin(arr_v)
out_arr[i,1] = np.amax(arr_v)
which leaves me with a desired result like
In [304]: out_arr
Out[304]:
array([[ 26., 908.],
[ 18., 993.],
[ 0., 968.],
[ 3., 999.],
[ 1., 998.],
[ 0., 994.]])
but this is far too slow on my actual data.
The concept of vectorized operations on NumPy allows the use of more optimal and pre-compiled functions and mathematical operations on NumPy array objects and data sequences. The Output and Operations will speed up when compared to simple non-vectorized operations.
The max() Function — Find the Largest Element of a List. In Python, there is a built-in function max() you can use to find the largest number in a list. To use it, call the max() on a list of numbers. It then returns the greatest number in that list.
Method. To add the two arrays together, we will use the numpy. add(arr1,arr2) method.
Because the Numpy array is densely packed in memory due to its homogeneous type, it also frees the memory faster. So overall a task executed in Numpy is around 5 to 100 times faster than the standard python list, which is a significant leap in terms of speed.
Ok, here is how to at least down-size the original problem using np.minimum.reduceat
:
lu = np.r_[lowers, uppers]
so = np.argsort(lu)
iso = np.empty_like(so)
iso[so] = np.arange(len(so))
cut = len(lowers)
lmin = np.minimum.reduceat(arr, lu[so])
for i in range(cut):
print(min(lmin[iso[i]:iso[cut+i]]), min(arr[lowers[i]:uppers[i]]))
# 33 33
# 7 7
# 5 5
# 0 0
# 3 3
# 7 7
What this doesn't achieve is getting rid of the main loop but at least the data were reduced from a 1000 element array to a 12 element one.
Update:
With small overlaps @Eric Hansen's own solutions are hard to beat. I'd still like to point out that if there are substantial overlaps then it may even be worthwhile to combine both methods. I don't have numba
, so below is just a twopass version that combines my preprossing with Eric's pure numpy
solution which also serves as a benchmark in the form of onepass
:
import numpy as np
from timeit import timeit
def twopass(lowers, uppers, arr):
lu = np.r_[lowers, uppers]
so = np.argsort(lu)
iso = np.empty_like(so)
iso[so] = np.arange(len(so))
cut = len(lowers)
lmin = np.minimum.reduceat(arr, lu[so])
return np.minimum.reduceat(lmin, iso.reshape(2,-1).T.ravel())[::2]
def onepass(lowers, uppers, arr):
mixture = np.empty((lowers.size*2,), dtype=lowers.dtype)
mixture[::2] = lowers; mixture[1::2] = uppers
return np.minimum.reduceat(arr, mixture)[::2]
arr = np.random.randint(0, 1000, 1000)
lowers = np.array([0, 5, 132, 358, 566, 822])
uppers = np.array([45, 93, 189, 533, 800, 923])
print('small')
for f in twopass, onepass:
print('{:18s} {:9.6f} ms'.format(f.__name__,
timeit(lambda: f(lowers, uppers, arr),
number=10)*100))
arr = np.random.randint(0, 1000, 10**6)
lowers = np.random.randint(0, 8*10**5, 10**4)
uppers = np.random.randint(2*10**5, 10**6, 10**4)
swap = lowers > uppers
lowers[swap], uppers[swap] = uppers[swap], lowers[swap]
print('large')
for f in twopass, onepass:
print('{:18s} {:10.4f} ms'.format(f.__name__,
timeit(lambda: f(lowers, uppers, arr),
number=10)*100))
Sample run:
small
twopass 0.030880 ms
onepass 0.005723 ms
large
twopass 74.4962 ms
onepass 3153.1575 ms
An improved version of my original attempt I came up with based on Paul Panzer's suggestion of reduceat
is
mixture = np.empty((lowers.size*2,), dtype=lowers.dtype)
mixture[::2] = lowers; mixture[1::2] = uppers
np.column_stack((np.minimum.reduceat(arr, mixture)[::2],
np.maximum.reduceat(arr, mixture)[::2]))
On a sample size comparable to my actual data, this runs in 4.22 ms on my machine compared to my original solution taking 73 ms.
Even faster though is to just use Numba with my original solution
from numba import jit
@jit
def get_res():
out_arr = np.empty((lowers.size, 2))
for i in range(lowers.size):
arr_v = arr[lowers[i]:uppers[i]]
out_arr[i,0] = np.amin(arr_v)
out_arr[i,1] = np.amax(arr_v)
return out_arr
which runs in 100 microseconds on my machine.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With