Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: How to make this color thresholding function more efficient

I wrote an adaptive color thresholding function in Python (because OpenCV's cv2.adaptiveThreshold didn't fit my needs) and it is way too slow. I've made it as efficient as I can, but it still takes almost 500 ms on a 1280x720 image.

I would greatly appreciate any suggestions that will make this function more efficient!

Here's what the function does: It uses a cross-shape of one-pixel thickness as the structuring element. For each pixel in the image, it computes the average values of ksize adjacent pixels in four directions independently (i.e. the average of ksize pixels in the same row to the left, in the same column above, in the same row to the right, and in the same column below). I end with four average values, one for each direction. A pixel only meets the threshold criterion if it is brighter than either both the left AND right averages or both the top AND bottom averages (plus some constant C).

I compute those averages incrementally for all pixels at the same time using numpy.roll(), but I still need to do this ksize times. The ksize will usually be 20-50.

Here is the code, the relevant part is really just what happens inside the for-loop:

def bilateral_adaptive_threshold(img, ksize=20, C=0, mode='floor', true_value=255, false_value=0):

    mask = np.full(img.shape, false_value, dtype=np.int16)

    left_thresh = np.zeros_like(img, dtype=np.float32) #Store the right-side average of each pixel here
    right_thresh = np.zeros_like(img, dtype=np.float32) #Store the left-side average of each pixel here
    up_thresh = np.zeros_like(img, dtype=np.float32) #Store the top-side average of each pixel here
    down_thresh = np.zeros_like(img, dtype=np.float32) #Store the bottom-side average of each pixel here

    for i in range(1, ksize+1): 
        roll_left = np.roll(img, -i, axis=1)
        roll_right = np.roll(img, i, axis=1)
        roll_up = np.roll(img, -i, axis=0)
        roll_down = np.roll(img, i, axis=0)

        roll_left[:,-i:] = 0
        roll_right[:,:i] = 0
        roll_up[-i:,:] = 0
        roll_down[:i,:] = 0

        left_thresh += roll_right
        right_thresh += roll_left
        up_thresh += roll_down
        down_thresh += roll_up

    left_thresh /= ksize
    right_thresh /= ksize
    up_thresh /= ksize
    down_thresh /= ksize

    if mode == 'floor':
        mask[((img > left_thresh+C) & (img > right_thresh+C)) | ((img > up_thresh+C) & (img > down_thresh+C))] = true_value
    elif mode == 'ceil':
        mask[((img < left_thresh-C) & (img < right_thresh-C)) | ((img < up_thresh-C) & (img < down_thresh-C))] = true_value
    else: raise ValueError("Unexpected mode value. Expected value is 'floor' or 'ceil'.")

    return mask
like image 988
Alex Avatar asked Mar 01 '17 19:03

Alex


People also ask

What is adaptive thresholding?

Adaptive thresholding is the method where the threshold value is calculated for smaller regions and therefore, there will be different threshold values for different regions. In OpenCV, you can perform Adaptive threshold operation on an image using the method adaptiveThreshold() of the Imgproc class.

How does OpenCV threshold work?

threshold is used to apply the thresholding. The first argument is the source image, which should be a grayscale image. The second argument is the threshold value which is used to classify the pixel values. The third argument is the maximum value which is assigned to pixel values exceeding the threshold.

How do I set threshold value in Python?

Wand threshold() function – Python The threshold() function is an inbuilt function in the Python Wand ImageMagick library which is used to modify the image such that any pixel's intensity value greater than the threshold is assigned the maximum intensity (white), or otherwise is assigned the minimum intensity (black).

What does cv2 threshold do?

We use the cv2. THRESH_BINARY_INV method, which indicates that pixel values p less than T are set to the output value (the third argument). The cv2. threshold function then returns a tuple of 2 values: the first, T, is the threshold value.


1 Answers

As you hint in your question, the dominant part of the function is obtaining the 4 arrays of sums needed to calculate the averages -- here it's on average 190ms out of 210ms for the whole function. So, let's focus on that.

First, necessary imports and a convenience timing function.

from timeit import default_timer as timer
import numpy as np
import cv2

## ===========================================================================

def time_fn(fn, img, ksize=20, iters=16):
    start = timer()
    for i in range(iters):
        fn(img, ksize)
    end = timer()
    return ((end - start) / iters) * 1000

## ===========================================================================
# Our test image
img = np.uint8(np.random.random((720,1280)) * 256)

Original Implementation

We can reduce your function in the following manner, so that it just calculates and returns the 4 arrays of sums. We can later use this to check that the optimized versions return the same result.

# Original code
def windowed_sum_v1(img, ksize=20):
    left_thresh = np.zeros_like(img, dtype=np.float32)
    right_thresh = np.zeros_like(img, dtype=np.float32)
    up_thresh = np.zeros_like(img, dtype=np.float32)
    down_thresh = np.zeros_like(img, dtype=np.float32)

    for i in range(1, ksize+1): 
        roll_left = np.roll(img, -i, axis=1)
        roll_right = np.roll(img, i, axis=1)
        roll_up = np.roll(img, -i, axis=0)
        roll_down = np.roll(img, i, axis=0)

        roll_left[:,-i:] = 0
        roll_right[:,:i] = 0
        roll_up[-i:,:] = 0
        roll_down[:i,:] = 0

        left_thresh += roll_right
        right_thresh += roll_left
        up_thresh += roll_down
        down_thresh += roll_up

    return (left_thresh, right_thresh, up_thresh, down_thresh)

Now we can find how much time this function takes on local machine:

>>> print "V1: %f ms" % time_fn(windowed_sum_v1, img, 20, 16)
V1: 188.572077 ms

Improvement #1

numpy.roll is bound to involve some overhead, but there's no need to dig into that here. Notice that after your roll the array, you zero out the rows or columns that spilled across the edge of the array. Then you add this to the accumulator. Adding a zero doesn't change the result, so we may as well avoid that. Instead, we can just add progressive smaller and appropriately offset slices of the whole array, avoiding the roll and (somewhat) reducing the total number of additions needed.

# Summing up ROIs
def windowed_sum_v2(img, ksize=20):
    h,w=(img.shape[0], img.shape[1])

    left_thresh = np.zeros_like(img, dtype=np.float32)
    right_thresh = np.zeros_like(img, dtype=np.float32)
    up_thresh = np.zeros_like(img, dtype=np.float32)
    down_thresh = np.zeros_like(img, dtype=np.float32)

    for i in range(1, ksize+1): 
        left_thresh[:,i:] += img[:,:w-i]
        right_thresh[:,:w-i] += img[:,i:]
        up_thresh[i:,:] += img[:h-i,:]
        down_thresh[:h-i,:] += img[i:,:]

    return (left_thresh, right_thresh, up_thresh, down_thresh)

Let's test this and time it:

>>> print "Results equal (V1 vs V2): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v2(img)))
Results equal (V1 vs V2): True
>>> print "V2: %f ms" % time_fn(windowed_sum_v2, img, 20, 16)
V2: 110.861794 ms

This implementation takes only 60% of the time of taken by the original. Can we do better?

Improvement #2

We still have a loop there. It would be nice if we could replace the repeated additions by a single call to some optimized function. One such function is cv2.filter2D, which calculates the following:

filter2D

We can create a kernel, such that the points we want to add have a weight of 1.0 and the point that the kernel is anchored on has a weight of 0.0.

For example, when ksize=8, we could use the following kernels and anchor positions.

Kernels for ksize=8

The function would then be as follows:

# Using filter2d
def windowed_sum_v3(img, ksize=20):
    kernel_l = np.array([[1.0] * (ksize) + [0.0]])
    kernel_r = np.array([[0.0] + [1.0] * (ksize)])
    kernel_u = np.array([[1.0]] * (ksize) + [[0.0]])
    kernel_d = np.array([[0.0]] + [[1.0]] * (ksize))

    left_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_l, anchor=(ksize,0), borderType=cv2.BORDER_CONSTANT)
    right_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_r, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)
    up_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_u, anchor=(0,ksize), borderType=cv2.BORDER_CONSTANT)
    down_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_d, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)

    return (left_thresh, right_thresh, up_thresh, down_thresh)

Again, let's test time this function:

>>> print "Results equal (V1 vs V3): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v3(img)))
Results equal (V1 vs V3): True
>>> print "V2: %f ms" % time_fn(windowed_sum_v3, img, 20, 16)
V3: 46.652996 ms

We're down to 25% of the original time.

Improvement #3

We're working in floating point, but right now we don't do any divisions, and the kernel contains only ones and zeros. That means we could potentially work with integers. You mention the maximum window size is 50, which means we're safe with 16 bit signed integers. Integer math tends to be faster, and if the code we use is properly vectorized, we can potentially process twice at once. Let's give this a shot, and let's also provide a wrapper that returns the result in floating point format as the previous versions.

# Integer only
def windowed_sum_v4(img, ksize=20):
    kernel_l = np.array([[1] * (ksize) + [0]], dtype=np.int16)
    kernel_r = np.array([[0] + [1] * (ksize)], dtype=np.int16)
    kernel_u = np.array([[1]] * (ksize) + [[0]], dtype=np.int16)
    kernel_d = np.array([[0]] + [[1]] * (ksize), dtype=np.int16)

    left_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_l, anchor=(ksize,0), borderType=cv2.BORDER_CONSTANT)
    right_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_r, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)
    up_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_u, anchor=(0,ksize), borderType=cv2.BORDER_CONSTANT)
    down_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_d, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)

    return (left_thresh, right_thresh, up_thresh, down_thresh)

# Integer only, but returning floats    
def windowed_sum_v5(img, ksize=20):
    result = windowed_sum_v4(img, ksize)
    return map(np.float32,result)

Let's test this out.

>>> print "Results equal (V1 vs V4): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v4(img)))
Results equal (V1 vs V4): True
>>> print "Results equal (V1 vs V5): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v5(img)))
Results equal (V1 vs V5): True
>>> print "V4: %f ms" % time_fn(windowed_sum_v4, img, 20, 16)
V4: 14.712223 ms
>>> print "V5: %f ms" % time_fn(windowed_sum_v5, img, 20, 16)
V5: 20.859744 ms

We're down to 7% if we're fine with 16 bit integers, or to 10% if we want floats.

Further Improvements

Let's get back to the full thresholding function you wrote. Instead of dividing the sums as a separate step to obtain the average, we can scale the kernels such that filter2D returns the mean directly. This is only a small improvement (~3%).

Similarly, you can replace the addition or subtraction of C, by providing an appropriate delta for the filter2D call. This again shaves off a few percent.

N.B.: You might encounter few differences arising due to the limits of floating point representation if you implement the two above-mentioned changes.

Another possibility is to make the comparisons required to determine the mask to be comparisons of matrix vs scalar:

input < threshold
input - input < threshold - input
0 < threshold - input
0 < adjusted_threshold            # determined using adjusted kernel

We can achieve this by modifying the kernels to subtract the value of anchor pixel scaled by an appropriate weight (ksize). With numpy, this seems to make only tiny difference, although the way I understand it, we could potentially save half the reads in that part of the algorithm (while filter2D presumably still reads and multiplies the corresponding values, even if the weight is 0).

Fastest Implementation of the Threshold Function

Taking all that in consideration, we can rewrite your function like this, and get the same result in ~12.5% time as the original:

def bilateral_adaptive_threshold5(img, ksize=20, C=0, mode='floor', true_value=255, false_value=0):
    mask = np.full(img.shape, false_value, dtype=np.uint8)

    kernel_l = np.array([[1] * (ksize) + [-ksize]], dtype=np.int16)
    kernel_r = np.array([[-ksize] + [1] * (ksize)], dtype=np.int16)
    kernel_u = np.array([[1]] * (ksize) + [[-ksize]], dtype=np.int16)
    kernel_d = np.array([[-ksize]] + [[1]] * (ksize), dtype=np.int16)

    if mode == 'floor':
        delta = C * ksize
    elif mode == 'ceil':
        delta = -C * ksize
    else: raise ValueError("Unexpected mode value. Expected value is 'floor' or 'ceil'.")

    left_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_l, anchor=(ksize,0), delta=delta, borderType=cv2.BORDER_CONSTANT)
    right_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_r, anchor=(0,0), delta=delta, borderType=cv2.BORDER_CONSTANT)
    up_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_u, anchor=(0,ksize), delta=delta, borderType=cv2.BORDER_CONSTANT)
    down_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_d, anchor=(0,0), delta=delta, borderType=cv2.BORDER_CONSTANT)

    if mode == 'floor':
        mask[((0 > left_thresh) & (0 > right_thresh)) | ((0 > up_thresh) & (0 > down_thresh))] = true_value
    elif mode == 'ceil':
        mask[((0 < left_thresh) & (0 < right_thresh)) | ((0 < up_thresh) & (0 < down_thresh))] = true_value

    return mask
like image 81
Dan Mašek Avatar answered Nov 14 '22 05:11

Dan Mašek