Redistributing excess values in numpy 2D array

I have the following numpy random 2D array:

np.random.rand(30, 20)

I want to iterate over each grid cell in the array. If the value of the grid cell is > 0.6, then I want to assign the excess to its immediate 8 neighboring cells (in case of corner grid cell, the number of neighboring cells will be fewer).

The excess should be redistributed as per one of 2 user-selected rules:

  1. Evenly amongst the 8 neighbors
  2. Proportional to the value in each neighbor i.e. a neighbor with higher value gets higher

Is there a way to do this in numpy without resorting to a for loop?

3 Answers

Ok, here's my take: In each step the algorithm detects all suprathreshold cells and simultaneously updates all these and all their neighbours either evenly or proprtionally; this is fully vectorised and comes in two implementations:

  • the generally faster one is based on linear convolution plus some trickery to conserve mass at the edges and corners;
  • the other one expresses the same operator as a sparse matrix, it is generally slower but I left it in because
  • it can handle sparse arguments and is therefore faster when the fraction of suprathreshold cells is low

Since this procedure does typically not converge in one step it is placed in a loop, however for all but the smallest grids its overhead should be minimal because its payload is substantial. The loop will terminate after a user supplied number of cycles or when there are no more suprathreshold units left. Optionally, it can record the Euclidean deltas between iterates.

A few words on the algorithm: if it weren't for the boundaries the even spreading operation could be described as subtracting the pattern of mass p that gets redistributed and then adding the same pattern convolved with a ring kernel k = [1 1 1; 1 0 1; 1 1 1] / 8; similarly, redistribution proportional to residual mass r can be written as

(1) r (k * (p / (k * r)))

where * is the convolution operator and multiplication and division are component wise. Parsing the formula we see that each point in p is first normalised by the average of the residual masses r * k over its 8 neighbours before it is spread to said neighbours (the other convolution) and scaled with the residual. The prenormalisation guarantees conservation of mass. In particular, it correctly normalises boundaries and corners. Building on this we see that the boundary problem of the even rule can be solved in a similarl fashion: by using (1) with r replaced with a sheet of ones.

Fun side note: With the proportional rule one can build non converging patterns. Here are two oscillators:

0.7  0  0.8  0  0.8  0             0   0   0   0
 0  0.6  0  0.6  0  0.6            0  1.0 0.6  0
0.8  0  1.0  0  1.0  0             0   0   0   0
 0  0.6  0  0.6  0  0.6

The code is below, rather long and technical I'm afraid but I tried to explain at least the main (fastest) branch; the main function is called level and there also are a few simple test functions.

There are a few print statements, but I think that's the only Python3 dependency.

import numpy as np
    from scipy import signal
    HAVE_SCIPY = True
except ImportError:
    HAVE_SCIPY = False
import time

USE_SCIPY = False # actually, numpy workaround is a bit faster

KERNEL = np.zeros((3, 3)) + 1/8
KERNEL[1, 1] = 0
def scipy_ring(a):
    """convolve 2d array a with kernel

    1/8 1/8 1/8
    1/8  0  1/8
    1/8 1/8 1/8
    return signal.convolve2d(a, KERNEL, mode='same')

def numpy_ring(a):
    """convolve 2d array a with kernel

    1/8 1/8 1/8
    1/8  0  1/8
    1/8 1/8 1/8
    tmp = a.copy()
    tmp[:, 1:] += a[:, :-1]
    tmp[:, :-1] += a[:, 1:]
    out = tmp.copy()
    out[1:, :] += tmp[:-1, :]
    out[:-1, :] += tmp[1:, :]
    return (out - a) / 8

    conv_with_ring = scipy_ring
    conv_with_ring = numpy_ring

# next is yet another implementation of convolution including edge correction.
# what for? it is most of the time much slower than the above but can handle
# sparse inputs and consequently is faster if the fraction of above-threshold
# cells is small (~5% or less)

def precomp(sh):
    """precompute sparse representation of operator mapping ravelled 2d
    array of shape sh to boundary corrected convolution with ring kernel

    1/8 1/8 1/8   /   1/5  0  1/5               0  1/3                \
    1/8  0  1/8   |   1/5 1/5 1/5   at edges,  1/3 1/3   at corners   |
    1/8 1/8 1/8   \                                                   /

    stored are
    - a table of flat indices encoding neighbours of the
      cell whose flat index equals the row no in the table
    - two scaled copies of the appropriate weights which
      equal 1 / neighbour count
    global SPREAD_CACHE
    m, n = sh
    # m*n grid points, each with up to 8 neighbours:
    # tedious but straighforward
    indices = np.empty((m*n, 8), dtype=int)
    indices[n-1:, 1] = np.arange(m*n - (n-1)) # NE
    indices[:-(n+1), 3] = np.arange(n+1, m*n) # SE
    indices[:-(n-1), 5] = np.arange(n-1, m*n) # SW
    indices[n+1:, 7] = np.arange(m*n - (n+1)) # NW
    indices[n:, 0] = np.arange((m-1)*n) # N
    indices[:n, [0, 1, 7]] = -1
    indices[:-1, 2] = np.arange(1, m*n) # E
    indices[n-1::n, 1:4] = -1
    indices[:-n, 4] = np.arange(n, m*n) # S
    indices[-n:, 3:6] = -1
    indices[1:, 6] = np.arange(m*n - 1) # W
    indices[::n, 5:] = -1
    # weights are constant along rows, therefore m*n vector suffices
    nei_count = (indices > -1).sum(axis=-1)
    weights = 1 / nei_count
    SPREAD_CACHE[sh] = indices, weights, 8 * weights.reshape(sh)
    return indices, weights, 8 * weights.reshape(sh)

def iadd_conv_ring(a, out):
    """add boundary corrected convolution of 2d array a with
    ring kernel

    1/8 1/8 1/8   /   1/5  0  1/5               0  1/3                \
    1/8  0  1/8   |   1/5 1/5 1/5   at edges,  1/3 1/3   at corners   |
    1/8 1/8 1/8   \                                                   /

    to out.

    IMPORTANT: out must be flat and have one spare field at its end
    which is used as a "/dev/NULL" by the indexing trickery

    if a is a tuple it is interpreted as a sparse representation of the
    form: (flat) indices, values, shape.
    if isinstance(a, tuple): # sparse
        ind, val, sh = a
        k_ind, k_wgt, __ \
            = SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh)
        np.add.at(out, k_ind[ind, :], k_wgt[ind, None]*val[:, None])
    else: # dense
        sh = a.shape
        k_ind, k_wgt, __ \
            = SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh)
        np.add.at(out, k_ind, k_wgt[:, None]*a.ravel()[:, None])
    return out

# main function
def level(input, threshold=0.6, rule='proportional', maxiter=1,
          switch_to_sparse_at=SPARSE_THRESH, use_conv_matrix=False,
    """spread supra-threshold mass to neighbours until equilibrium reached

    updates are simultaneous, iterations are capped at maxiter
    'rule' can be 'proportional' or 'even'
    'switch_to_sparse_at' and 'use_conv_matrix' influence speed but not

    returns updated grid, convergence flag, vector of numbers of supratheshold
    cells for each iteration, runtime, [vector of Euclidean deltas]
    sh = input.shape
    m, n = sh
    nei_ind, rec_nc, rec_8 \
        = SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh)
    if rule == 'proportional':
        def iteration(state, state_f):
            em = state > threshold
            nnz = em.sum()
            if nnz == 0: # no change, send signal to quit
                return nnz
            elif nnz < em.size * switch_to_sparse_at: # use sparse code
                ei = np.where(em.flat)[0]
                excess = state_f[ei] - threshold
                state_f[-1] = 0
                exc_nei_sum = rec_nc[ei] * state_f[nei_ind[ei, :]].sum(axis=-1)
                exc_nei_ind = np.unique(nei_ind[ei, :])
                if exc_nei_ind[0] == -1:
                    exc_nei_ind = exc_nei_ind[1:]
                nm = exc_nei_sum != 0
                state_swap = state_f[exc_nei_ind]
                state_f[exc_nei_ind] = 1
                iadd_conv_ring((ei[nm], excess[nm] / exc_nei_sum[nm], sh),
                state_f[exc_nei_ind] *= state_swap
                iadd_conv_ring((ei[~nm], excess[~nm], sh), state_f)
                state_f[ei] -= excess
            elif use_conv_matrix:
                excess = np.where(em, state - threshold, 0)
                state_f[-1] = 0
                nei_sum = (rec_nc * state_f[nei_ind].sum(axis=-1)).reshape(sh)
                nm = nei_sum != 0
                pm = em & nm
                exc_p = np.where(pm, excess, 0)
                exc_p[pm] /= nei_sum[pm]
                wei_nei_sum = iadd_conv_ring(exc_p, np.zeros_like(state_f))
                state += state * wei_nei_sum[:-1].reshape(sh)
                fm = em & ~nm
                exc_f = np.where(fm, excess, 0)
                iadd_conv_ring(exc_f, state_f)
                state -= excess
                excess = np.where(em, state - threshold, 0)
                nei_sum = conv_with_ring(state)
                # must special case the event of all neighbours being zero
                nm = nei_sum != 0
                # these can be distributed proportionally:
                pm = em & nm
                # select, prenormalise by sum of masses of neighbours,...
                exc_p = np.where(pm, excess, 0)
                exc_p[pm] /= nei_sum[pm]
                # ...spread to neighbours and scale
                spread_p = state * conv_with_ring(exc_p)
                # these can't be distributed proportionally (because all
                # neighbours are zero); therefore fall back to even:
                fm = em & ~nm
                exc_f = np.where(fm, excess * rec_8, 0)
                spread_f = conv_with_ring(exc_f)
                state += spread_p + spread_f - excess
            return nnz
    elif rule == 'even':
        def iteration(state, state_f):
            em = state > threshold
            nnz = em.sum()
            if nnz == 0: # no change, send signal to quit
                return nnz
            elif nnz < em.size * switch_to_sparse_at: # use sparse code
                ei = np.where(em.flat)[0]
                excess = state_f[ei] - threshold
                iadd_conv_ring((ei, excess, sh), state_f)
                state_f[ei] -= excess
            elif use_conv_matrix:
                excess = np.where(em, state - threshold, 0)
                iadd_conv_ring(excess, state_f)
                state -= excess
                excess = np.where(em, state - threshold, 0)
                # prenormalise by number of neighbours, and spread
                spread = conv_with_ring(excess * rec_8)
                state += spread - excess
            return nnz
        raise ValueError('unknown rule: ' + rule)

    # master loop
    t0 = time.time()
    out_f = np.empty((m*n + 1,))
    out = out_f[:m*n]
    out[:] = input.ravel()
    out.shape = sh
    nnz = []
    if track_Euclidean_deltas:
        last = input
        E = []
    for i in range(maxiter):
        nnz.append(iteration(out, out_f))
        if nnz[-1] == 0:
            if track_Euclidean_deltas:
                return out, True, nnz, time.time() - t0, E + [0]
            return out, True, nnz, time.time() - t0
        if track_Euclidean_deltas:
            last = out.copy()
    if track_Euclidean_deltas:
        return out, False, nnz, time.time() - t0, E
    return out, False, nnz, time.time() - t0

# tests

def check_simple():
    A = np.zeros((6, 6))
    A[[0, 1, 1, 4, 4], [0, 3, 5, 1, 5]] = 1.08
    A[5, :] = 0.1 * np.arange(6)
    for rule in ('proportional', 'even'):
        for lb, ucm, st in (('convolution', False, 0.001),
                            ('matrix', True, 0.001), ('sparse', True, 0.999)):
            print(level(A, rule=rule, switch_to_sparse_at=st,

def check_consistency(sh=(300, 400), n=20):
    print("""Running consistency checks with different solvers
{} trials each {} x {} cells

    """.format(n, *sh))
    data = np.random.random((n,) + sh)
    sums = data.sum(axis=(1, 2))
    for th, lb in ((0.975, 'sparse'), (0.6, 'dense'),
                   (0.975, 'sparse'), (0.6, 'dense'),
                   (0.975, 'sparse'), (0.6, 'dense')):
        times = np.zeros((2, 3))
        for d, s in zip (data, sums):
            for i, rule in enumerate(('proportional', 'even')):
                results = []
                for j, (ucm, st) in enumerate (
                        ((False, 0.001), (True, 0.001), (True, 0.999))):
                    res, conv, nnz, time = level(
                        d, rule=rule, switch_to_sparse_at=st,
                        use_conv_matrix=ucm, threshold=th)
                    times[i, j] += time
                assert np.allclose(results[0], results[1])
                assert np.allclose(results[1], results[2])
                assert np.allclose(results[2], results[0])
                assert np.allclose(s, [r.sum() for r in results])
        print("""condition {} finished, no obvious errors; runtimes [sec]:
                 convolution   matrix         sparse solver
proportional  {:13.7f}  {:13.7f}  {:13.7f}
even          {:13.7f}  {:13.7f}  {:13.7f}

""".format(lb, *tuple(times.ravel())))

def check_convergence(sh=(300, 400), maxiter=100):
    data = np.random.random(sh)
    res, conv, nnz, time, Eucl = level(data, maxiter=maxiter,
    print('nnz:', nnz)
    print('delta:', Eucl)
    print('final length:', np.sqrt((res*res).sum()))
    print('ratio:', Eucl[-1] / np.sqrt((res*res).sum()))
Paul Panzer

This solution finds the values to spread in each of the eight directions, then adds them together.

EDIT: I changed the functionality below to include either proportional or even weighting. These were treated as the same calculation, but with even being achieved when using all ones for the weighting instead of the input array.

EDIT: I changed the benchmarks to match @Paul's test case. This is faster than several of the cases mentioned there, but not the fastest if you're working with sparse matrices. A clear advantage of the following code is that you don't need a Ph.D to understand and maintain it :) It directly does the requested operations using primarily numpy array slicing.

import numpy as np
import time
import sys

def main():

    # Define parameters
    numbers = np.random.rand(300, 400)
    threshold = 0.6
    max_iters = 1
    n = 20

    # Clock the evenly distributed total time for n calls
    t0 = time.time()
    for ctr in range(n):
        s=spread( numbers, threshold, max_iters, rule='even' )
    print('Evenly distributed: {:0.4f}s'.format(time.time()-t0))
    # Evenly distributed: 0.2007s

    # Clock the proportionally distributed total time for n calls
    t0 = time.time()
    for ctr in range(n):
        s=spread( numbers, threshold, max_iters, rule='proportional' )
    print('Proportionally distributed: {:0.4f}s'.format(time.time()-t0))
    # Proportionally distributed: 0.2234s

def spread(numbers,threshold,max_iters=10, rule='even', first_call=True):
    '''Spread the extra over a threshold among the adjacent values.'''

    # This recursive function may go over the Python recursion limit!
    if first_call==True :
        if max_iters > 20000:
            raise(ValueError('max_iters must be less than 20000, but got "{}"'.format(max_iters)))
        elif max_iters > 900:

    n_rows = numbers.shape[0]
    n_cols = numbers.shape[1]

    # Find excess over threshold of each point
    excess = np.maximum( numbers - threshold, np.zeros( numbers.shape ) )

    # Find the value to base the weighting on
    if rule == 'even':
        up = np.ones((n_rows-1,n_cols))
        down = np.ones((n_rows-1,n_cols))
        left = np.ones((n_rows,n_cols-1))
        right = np.ones((n_rows,n_cols-1))
        up_left = np.ones((n_rows-1,n_cols-1))
        up_right = np.ones((n_rows-1,n_cols-1))
        down_left = np.ones((n_rows-1,n_cols-1))
        down_right = np.ones((n_rows-1,n_cols-1))

    elif rule == 'proportional':
        up = numbers[1:,:]
        down = numbers[:-1,:]
        left = numbers[:,1:]
        right = numbers[:,:-1]
        up_left = numbers[1:,1:]
        up_right = numbers[1:,:-1]
        down_left = numbers[:-1,1:]
        down_right = numbers[:-1,:-1]

        raise(ValueError('Invalid rule "{}"'.format(rule)))

    # Find normalized weight in each direction
    num_up = np.concatenate( (up,np.zeros((1,n_cols))), axis=0) 
    num_down = np.concatenate( (np.zeros((1,n_cols)),down), axis=0)
    num_left = np.concatenate( (left,np.zeros((n_rows,1))), axis=1)
    num_right = np.concatenate( (np.zeros((n_rows,1)),right), axis=1)
    num_up_left = np.concatenate( (np.concatenate( (up_left,np.zeros((1,n_cols-1))), axis=0), np.zeros((n_rows,1))), axis=1)
    num_up_right = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (up_right,np.zeros((1,n_cols-1))), axis=0)), axis=1)
    num_down_left = np.concatenate( (np.concatenate( (np.zeros((1,n_cols-1)),down_left), axis=0), np.zeros((n_rows,1))), axis=1)
    num_down_right = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (np.zeros((1,n_cols-1)),down_right), axis=0)), axis=1)
    num_sum = num_up + num_down + num_left + num_right + num_up_left + num_up_right + num_down_left + num_down_right
    up_weight = num_up / num_sum
    down_weight = num_down / num_sum
    left_weight = num_left / num_sum
    right_weight = num_right / num_sum
    up_left_weight = num_up_left / num_sum
    up_right_weight = num_up_right / num_sum
    down_left_weight = num_down_left / num_sum
    down_right_weight = num_down_right / num_sum

    # Set NaN values to zero
    up_weight[np.isnan(up_weight)] = 0
    down_weight[np.isnan(down_weight)] = 0
    left_weight[np.isnan(left_weight)] = 0
    right_weight[np.isnan(right_weight)] = 0
    up_left_weight[np.isnan(up_left_weight)] = 0
    up_right_weight[np.isnan(up_right_weight)] = 0
    down_left_weight[np.isnan(down_left_weight)] = 0
    down_right_weight[np.isnan(down_right_weight)] = 0

    # Apply weight to the excess to find the contributions
    up = (excess * up_weight)[:-1,:]
    down = (excess * down_weight)[1:,:]
    left = (excess * left_weight)[:,:-1]
    right = (excess * right_weight)[:,1:]
    up_left = (excess * up_left_weight)[:-1,:-1]
    up_right = (excess * up_right_weight)[:-1,1:]
    down_left = (excess * down_left_weight)[1:,:-1]
    down_right = (excess * down_right_weight)[1:,1:]

    # Pad with zeros
    down = np.concatenate( (down,np.zeros((1,n_cols))), axis=0) 
    up = np.concatenate( (np.zeros((1,n_cols)),up), axis=0)
    right = np.concatenate( (right,np.zeros((n_rows,1))), axis=1)
    left = np.concatenate( (np.zeros((n_rows,1)),left), axis=1)
    down_right = np.concatenate( (np.concatenate( (down_right,np.zeros((1,n_cols-1))), axis=0), np.zeros((n_rows,1))), axis=1)
    down_left = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (down_left,np.zeros((1,n_cols-1))), axis=0)), axis=1)
    up_right = np.concatenate( (np.concatenate( (np.zeros((1,n_cols-1)),up_right), axis=0), np.zeros((n_rows,1))), axis=1)
    up_left = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (np.zeros((1,n_cols-1)),up_left), axis=0)), axis=1)

    # Add the contributions to find the result 
    result = numbers - excess + up + down + left + right + up_left + up_right + down_left + down_right

    if (np.amax(result) > threshold) and (max_iters > 1):
        return spread(numbers=result,threshold=threshold,max_iters=max_iters-1,rule=rule,first_call=False)
        return result

if __name__ == '__main__':
def disperse_peaks(a,thr,iter=10,prop=False):
    if prop:
    while np.max(b)>thr:
        if idx==iter:
    return b

What this does:

  • Gets indices of 4D transform array
  • Creates boolean adjacency matrix by comparing indices (maximum difference is 1)
  • Divides boolean matrix by number of "True" in each slice to get weights (proportional or otherwise)
  • Updates (up to iter times) the matrix by taking the difference, transforming it, and adding to the residual

Did some testing and it isn't guaranteed to get to 0.6 exactly, no matter how many iterations. Not sure why (float comparison errors probably), but it seems to work otherwise. Sum stays the same and max(disperse_peaks(a)) tends towards thr.

As for the second option, I'd need a bit more information about what type of weighting to give to surrounding numbers. I've done it linearly right now, but almost any distribution is possible.

Daniel F