I have the following numpy
random 2D array:
np.random.rand(30, 20)
I want to iterate over each grid cell in the array. If the value of the grid cell is > 0.6, then I want to assign the excess to its immediate 8 neighboring cells (in case of corner grid cell, the number of neighboring cells will be fewer).
The excess should be redistributed as per one of 2 user-selected rules:
Is there a way to do this in numpy
without resorting to a for
loop?
Ok, here's my take: In each step the algorithm detects all suprathreshold cells and simultaneously updates all these and all their neighbours either evenly or proprtionally; this is fully vectorised and comes in two implementations:
Since this procedure does typically not converge in one step it is placed in a loop, however for all but the smallest grids its overhead should be minimal because its payload is substantial. The loop will terminate after a user supplied number of cycles or when there are no more suprathreshold units left. Optionally, it can record the Euclidean deltas between iterates.
A few words on the algorithm: if it weren't for the boundaries the even spreading operation could be described as subtracting the pattern of mass p that gets redistributed and then adding the same pattern convolved with a ring kernel k = [1 1 1; 1 0 1; 1 1 1] / 8; similarly, redistribution proportional to residual mass r can be written as
(1) r (k * (p / (k * r)))
where * is the convolution operator and multiplication and division are component wise. Parsing the formula we see that each point in p is first normalised by the average of the residual masses r * k over its 8 neighbours before it is spread to said neighbours (the other convolution) and scaled with the residual. The prenormalisation guarantees conservation of mass. In particular, it correctly normalises boundaries and corners. Building on this we see that the boundary problem of the even rule can be solved in a similarl fashion: by using (1) with r replaced with a sheet of ones.
Fun side note: With the proportional rule one can build non converging patterns. Here are two oscillators:
0.7 0 0.8 0 0.8 0 0 0 0 0
0 0.6 0 0.6 0 0.6 0 1.0 0.6 0
0.8 0 1.0 0 1.0 0 0 0 0 0
0 0.6 0 0.6 0 0.6
The code is below, rather long and technical I'm afraid but I tried to explain at least the main (fastest) branch; the main function is called level
and there also are a few simple test functions.
There are a few print
statements, but I think that's the only Python3 dependency.
import numpy as np
try:
from scipy import signal
HAVE_SCIPY = True
except ImportError:
HAVE_SCIPY = False
import time
SPARSE_THRESH = 0.05
USE_SCIPY = False # actually, numpy workaround is a bit faster
KERNEL = np.zeros((3, 3)) + 1/8
KERNEL[1, 1] = 0
def scipy_ring(a):
"""convolve 2d array a with kernel
1/8 1/8 1/8
1/8 0 1/8
1/8 1/8 1/8
"""
return signal.convolve2d(a, KERNEL, mode='same')
def numpy_ring(a):
"""convolve 2d array a with kernel
1/8 1/8 1/8
1/8 0 1/8
1/8 1/8 1/8
"""
tmp = a.copy()
tmp[:, 1:] += a[:, :-1]
tmp[:, :-1] += a[:, 1:]
out = tmp.copy()
out[1:, :] += tmp[:-1, :]
out[:-1, :] += tmp[1:, :]
return (out - a) / 8
if USE_SCIPY and HAVE_SCIPY:
conv_with_ring = scipy_ring
else:
conv_with_ring = numpy_ring
# next is yet another implementation of convolution including edge correction.
# what for? it is most of the time much slower than the above but can handle
# sparse inputs and consequently is faster if the fraction of above-threshold
# cells is small (~5% or less)
SPREAD_CACHE = {}
def precomp(sh):
"""precompute sparse representation of operator mapping ravelled 2d
array of shape sh to boundary corrected convolution with ring kernel
1/8 1/8 1/8 / 1/5 0 1/5 0 1/3 \
1/8 0 1/8 | 1/5 1/5 1/5 at edges, 1/3 1/3 at corners |
1/8 1/8 1/8 \ /
stored are
- a table of flat indices encoding neighbours of the
cell whose flat index equals the row no in the table
- two scaled copies of the appropriate weights which
equal 1 / neighbour count
"""
global SPREAD_CACHE
m, n = sh
# m*n grid points, each with up to 8 neighbours:
# tedious but straighforward
indices = np.empty((m*n, 8), dtype=int)
indices[n-1:, 1] = np.arange(m*n - (n-1)) # NE
indices[:-(n+1), 3] = np.arange(n+1, m*n) # SE
indices[:-(n-1), 5] = np.arange(n-1, m*n) # SW
indices[n+1:, 7] = np.arange(m*n - (n+1)) # NW
indices[n:, 0] = np.arange((m-1)*n) # N
indices[:n, [0, 1, 7]] = -1
indices[:-1, 2] = np.arange(1, m*n) # E
indices[n-1::n, 1:4] = -1
indices[:-n, 4] = np.arange(n, m*n) # S
indices[-n:, 3:6] = -1
indices[1:, 6] = np.arange(m*n - 1) # W
indices[::n, 5:] = -1
# weights are constant along rows, therefore m*n vector suffices
nei_count = (indices > -1).sum(axis=-1)
weights = 1 / nei_count
SPREAD_CACHE[sh] = indices, weights, 8 * weights.reshape(sh)
return indices, weights, 8 * weights.reshape(sh)
def iadd_conv_ring(a, out):
"""add boundary corrected convolution of 2d array a with
ring kernel
1/8 1/8 1/8 / 1/5 0 1/5 0 1/3 \
1/8 0 1/8 | 1/5 1/5 1/5 at edges, 1/3 1/3 at corners |
1/8 1/8 1/8 \ /
to out.
IMPORTANT: out must be flat and have one spare field at its end
which is used as a "/dev/NULL" by the indexing trickery
if a is a tuple it is interpreted as a sparse representation of the
form: (flat) indices, values, shape.
"""
if isinstance(a, tuple): # sparse
ind, val, sh = a
k_ind, k_wgt, __ \
= SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh)
np.add.at(out, k_ind[ind, :], k_wgt[ind, None]*val[:, None])
else: # dense
sh = a.shape
k_ind, k_wgt, __ \
= SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh)
np.add.at(out, k_ind, k_wgt[:, None]*a.ravel()[:, None])
return out
# main function
def level(input, threshold=0.6, rule='proportional', maxiter=1,
switch_to_sparse_at=SPARSE_THRESH, use_conv_matrix=False,
track_Euclidean_deltas=False):
"""spread supra-threshold mass to neighbours until equilibrium reached
updates are simultaneous, iterations are capped at maxiter
'rule' can be 'proportional' or 'even'
'switch_to_sparse_at' and 'use_conv_matrix' influence speed but not
result
returns updated grid, convergence flag, vector of numbers of supratheshold
cells for each iteration, runtime, [vector of Euclidean deltas]
"""
sh = input.shape
m, n = sh
nei_ind, rec_nc, rec_8 \
= SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh)
if rule == 'proportional':
def iteration(state, state_f):
em = state > threshold
nnz = em.sum()
if nnz == 0: # no change, send signal to quit
return nnz
elif nnz < em.size * switch_to_sparse_at: # use sparse code
ei = np.where(em.flat)[0]
excess = state_f[ei] - threshold
state_f[-1] = 0
exc_nei_sum = rec_nc[ei] * state_f[nei_ind[ei, :]].sum(axis=-1)
exc_nei_ind = np.unique(nei_ind[ei, :])
if exc_nei_ind[0] == -1:
exc_nei_ind = exc_nei_ind[1:]
nm = exc_nei_sum != 0
state_swap = state_f[exc_nei_ind]
state_f[exc_nei_ind] = 1
iadd_conv_ring((ei[nm], excess[nm] / exc_nei_sum[nm], sh),
state_f)
state_f[exc_nei_ind] *= state_swap
iadd_conv_ring((ei[~nm], excess[~nm], sh), state_f)
state_f[ei] -= excess
elif use_conv_matrix:
excess = np.where(em, state - threshold, 0)
state_f[-1] = 0
nei_sum = (rec_nc * state_f[nei_ind].sum(axis=-1)).reshape(sh)
nm = nei_sum != 0
pm = em & nm
exc_p = np.where(pm, excess, 0)
exc_p[pm] /= nei_sum[pm]
wei_nei_sum = iadd_conv_ring(exc_p, np.zeros_like(state_f))
state += state * wei_nei_sum[:-1].reshape(sh)
fm = em & ~nm
exc_f = np.where(fm, excess, 0)
iadd_conv_ring(exc_f, state_f)
state -= excess
else:
excess = np.where(em, state - threshold, 0)
nei_sum = conv_with_ring(state)
# must special case the event of all neighbours being zero
nm = nei_sum != 0
# these can be distributed proportionally:
pm = em & nm
# select, prenormalise by sum of masses of neighbours,...
exc_p = np.where(pm, excess, 0)
exc_p[pm] /= nei_sum[pm]
# ...spread to neighbours and scale
spread_p = state * conv_with_ring(exc_p)
# these can't be distributed proportionally (because all
# neighbours are zero); therefore fall back to even:
fm = em & ~nm
exc_f = np.where(fm, excess * rec_8, 0)
spread_f = conv_with_ring(exc_f)
state += spread_p + spread_f - excess
return nnz
elif rule == 'even':
def iteration(state, state_f):
em = state > threshold
nnz = em.sum()
if nnz == 0: # no change, send signal to quit
return nnz
elif nnz < em.size * switch_to_sparse_at: # use sparse code
ei = np.where(em.flat)[0]
excess = state_f[ei] - threshold
iadd_conv_ring((ei, excess, sh), state_f)
state_f[ei] -= excess
elif use_conv_matrix:
excess = np.where(em, state - threshold, 0)
iadd_conv_ring(excess, state_f)
state -= excess
else:
excess = np.where(em, state - threshold, 0)
# prenormalise by number of neighbours, and spread
spread = conv_with_ring(excess * rec_8)
state += spread - excess
return nnz
else:
raise ValueError('unknown rule: ' + rule)
# master loop
t0 = time.time()
out_f = np.empty((m*n + 1,))
out = out_f[:m*n]
out[:] = input.ravel()
out.shape = sh
nnz = []
if track_Euclidean_deltas:
last = input
E = []
for i in range(maxiter):
nnz.append(iteration(out, out_f))
if nnz[-1] == 0:
if track_Euclidean_deltas:
return out, True, nnz, time.time() - t0, E + [0]
return out, True, nnz, time.time() - t0
if track_Euclidean_deltas:
E.append(np.sqrt(((last-out)**2).sum()))
last = out.copy()
if track_Euclidean_deltas:
return out, False, nnz, time.time() - t0, E
return out, False, nnz, time.time() - t0
# tests
def check_simple():
A = np.zeros((6, 6))
A[[0, 1, 1, 4, 4], [0, 3, 5, 1, 5]] = 1.08
A[5, :] = 0.1 * np.arange(6)
print('original')
print(A)
for rule in ('proportional', 'even'):
print(rule)
for lb, ucm, st in (('convolution', False, 0.001),
('matrix', True, 0.001), ('sparse', True, 0.999)):
print(lb)
print(level(A, rule=rule, switch_to_sparse_at=st,
use_conv_matrix=ucm)[0])
def check_consistency(sh=(300, 400), n=20):
print("""Running consistency checks with different solvers
{} trials each {} x {} cells
""".format(n, *sh))
data = np.random.random((n,) + sh)
sums = data.sum(axis=(1, 2))
for th, lb in ((0.975, 'sparse'), (0.6, 'dense'),
(0.975, 'sparse'), (0.6, 'dense'),
(0.975, 'sparse'), (0.6, 'dense')):
times = np.zeros((2, 3))
for d, s in zip (data, sums):
for i, rule in enumerate(('proportional', 'even')):
results = []
for j, (ucm, st) in enumerate (
((False, 0.001), (True, 0.001), (True, 0.999))):
res, conv, nnz, time = level(
d, rule=rule, switch_to_sparse_at=st,
use_conv_matrix=ucm, threshold=th)
results.append(res)
times[i, j] += time
assert np.allclose(results[0], results[1])
assert np.allclose(results[1], results[2])
assert np.allclose(results[2], results[0])
assert np.allclose(s, [r.sum() for r in results])
print("""condition {} finished, no obvious errors; runtimes [sec]:
convolution matrix sparse solver
proportional {:13.7f} {:13.7f} {:13.7f}
even {:13.7f} {:13.7f} {:13.7f}
""".format(lb, *tuple(times.ravel())))
def check_convergence(sh=(300, 400), maxiter=100):
data = np.random.random(sh)
res, conv, nnz, time, Eucl = level(data, maxiter=maxiter,
track_Euclidean_deltas=True)
print('nnz:', nnz)
print('delta:', Eucl)
print('final length:', np.sqrt((res*res).sum()))
print('ratio:', Eucl[-1] / np.sqrt((res*res).sum()))
This solution finds the values to spread in each of the eight directions, then adds them together.
EDIT: I changed the functionality below to include either proportional or even weighting. These were treated as the same calculation, but with even being achieved when using all ones for the weighting instead of the input array.
EDIT: I changed the benchmarks to match @Paul's test case. This is faster than several of the cases mentioned there, but not the fastest if you're working with sparse matrices. A clear advantage of the following code is that you don't need a Ph.D to understand and maintain it :) It directly does the requested operations using primarily numpy
array slicing.
import numpy as np
import time
import sys
def main():
# Define parameters
numbers = np.random.rand(300, 400)
threshold = 0.6
max_iters = 1
n = 20
# Clock the evenly distributed total time for n calls
t0 = time.time()
for ctr in range(n):
s=spread( numbers, threshold, max_iters, rule='even' )
print('Evenly distributed: {:0.4f}s'.format(time.time()-t0))
# Evenly distributed: 0.2007s
# Clock the proportionally distributed total time for n calls
t0 = time.time()
for ctr in range(n):
s=spread( numbers, threshold, max_iters, rule='proportional' )
print('Proportionally distributed: {:0.4f}s'.format(time.time()-t0))
# Proportionally distributed: 0.2234s
def spread(numbers,threshold,max_iters=10, rule='even', first_call=True):
'''Spread the extra over a threshold among the adjacent values.'''
# This recursive function may go over the Python recursion limit!
if first_call==True :
if max_iters > 20000:
raise(ValueError('max_iters must be less than 20000, but got "{}"'.format(max_iters)))
elif max_iters > 900:
sys.setrecursionlimit(max_iters+1000)
else:
pass
n_rows = numbers.shape[0]
n_cols = numbers.shape[1]
# Find excess over threshold of each point
excess = np.maximum( numbers - threshold, np.zeros( numbers.shape ) )
# Find the value to base the weighting on
if rule == 'even':
up = np.ones((n_rows-1,n_cols))
down = np.ones((n_rows-1,n_cols))
left = np.ones((n_rows,n_cols-1))
right = np.ones((n_rows,n_cols-1))
up_left = np.ones((n_rows-1,n_cols-1))
up_right = np.ones((n_rows-1,n_cols-1))
down_left = np.ones((n_rows-1,n_cols-1))
down_right = np.ones((n_rows-1,n_cols-1))
elif rule == 'proportional':
up = numbers[1:,:]
down = numbers[:-1,:]
left = numbers[:,1:]
right = numbers[:,:-1]
up_left = numbers[1:,1:]
up_right = numbers[1:,:-1]
down_left = numbers[:-1,1:]
down_right = numbers[:-1,:-1]
else:
raise(ValueError('Invalid rule "{}"'.format(rule)))
# Find normalized weight in each direction
num_up = np.concatenate( (up,np.zeros((1,n_cols))), axis=0)
num_down = np.concatenate( (np.zeros((1,n_cols)),down), axis=0)
num_left = np.concatenate( (left,np.zeros((n_rows,1))), axis=1)
num_right = np.concatenate( (np.zeros((n_rows,1)),right), axis=1)
num_up_left = np.concatenate( (np.concatenate( (up_left,np.zeros((1,n_cols-1))), axis=0), np.zeros((n_rows,1))), axis=1)
num_up_right = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (up_right,np.zeros((1,n_cols-1))), axis=0)), axis=1)
num_down_left = np.concatenate( (np.concatenate( (np.zeros((1,n_cols-1)),down_left), axis=0), np.zeros((n_rows,1))), axis=1)
num_down_right = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (np.zeros((1,n_cols-1)),down_right), axis=0)), axis=1)
num_sum = num_up + num_down + num_left + num_right + num_up_left + num_up_right + num_down_left + num_down_right
up_weight = num_up / num_sum
down_weight = num_down / num_sum
left_weight = num_left / num_sum
right_weight = num_right / num_sum
up_left_weight = num_up_left / num_sum
up_right_weight = num_up_right / num_sum
down_left_weight = num_down_left / num_sum
down_right_weight = num_down_right / num_sum
# Set NaN values to zero
up_weight[np.isnan(up_weight)] = 0
down_weight[np.isnan(down_weight)] = 0
left_weight[np.isnan(left_weight)] = 0
right_weight[np.isnan(right_weight)] = 0
up_left_weight[np.isnan(up_left_weight)] = 0
up_right_weight[np.isnan(up_right_weight)] = 0
down_left_weight[np.isnan(down_left_weight)] = 0
down_right_weight[np.isnan(down_right_weight)] = 0
# Apply weight to the excess to find the contributions
up = (excess * up_weight)[:-1,:]
down = (excess * down_weight)[1:,:]
left = (excess * left_weight)[:,:-1]
right = (excess * right_weight)[:,1:]
up_left = (excess * up_left_weight)[:-1,:-1]
up_right = (excess * up_right_weight)[:-1,1:]
down_left = (excess * down_left_weight)[1:,:-1]
down_right = (excess * down_right_weight)[1:,1:]
# Pad with zeros
down = np.concatenate( (down,np.zeros((1,n_cols))), axis=0)
up = np.concatenate( (np.zeros((1,n_cols)),up), axis=0)
right = np.concatenate( (right,np.zeros((n_rows,1))), axis=1)
left = np.concatenate( (np.zeros((n_rows,1)),left), axis=1)
down_right = np.concatenate( (np.concatenate( (down_right,np.zeros((1,n_cols-1))), axis=0), np.zeros((n_rows,1))), axis=1)
down_left = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (down_left,np.zeros((1,n_cols-1))), axis=0)), axis=1)
up_right = np.concatenate( (np.concatenate( (np.zeros((1,n_cols-1)),up_right), axis=0), np.zeros((n_rows,1))), axis=1)
up_left = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (np.zeros((1,n_cols-1)),up_left), axis=0)), axis=1)
# Add the contributions to find the result
result = numbers - excess + up + down + left + right + up_left + up_right + down_left + down_right
if (np.amax(result) > threshold) and (max_iters > 1):
return spread(numbers=result,threshold=threshold,max_iters=max_iters-1,rule=rule,first_call=False)
else:
return result
if __name__ == '__main__':
main()
def disperse_peaks(a,thr,iter=10,prop=False):
n=a.shape
i=np.arange(n[0])
j=np.arange(n[1])
m=np.fmax(np.abs(i[:,None,None,None]-i[None,None,:,None]),np.abs(j[None,:,None,None]-j[None,None,None,:]))==1
if prop:
transf=a*m/np.sum(a*m,axis=(2,3))[:,:,None,None]
else:
transf=m/np.sum(m,axis=(2,3))[:,:,None,None]
b=a.copy()
idx=0
while np.max(b)>thr:
idx+=1
resid=np.where(b>thr,thr,b)
diff=b-resid
b=resid+np.einsum('ijkl,ij->kl',transf,diff)
if idx==iter:
break
return b
What this does:
iter
times) the matrix by taking the difference, transforming it, and adding to the residualDid some testing and it isn't guaranteed to get to 0.6 exactly, no matter how many iterations. Not sure why (float comparison errors probably), but it seems to work otherwise. Sum stays the same and max(disperse_peaks(a))
tends towards thr
.
As for the second option, I'd need a bit more information about what type of weighting to give to surrounding numbers. I've done it linearly right now, but almost any distribution is possible.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With