I wrote an adaptive color thresholding function in Python (because OpenCV's cv2.adaptiveThreshold didn't fit my needs) and it is way too slow. I've made it as efficient as I can, but it still takes almost 500 ms on a 1280x720 image.
I would greatly appreciate any suggestions that will make this function more efficient!
Here's what the function does: It uses a cross-shape of one-pixel thickness as the structuring element. For each pixel in the image, it computes the average values of ksize
adjacent pixels in four directions independently (i.e. the average of ksize
pixels in the same row to the left, in the same column above, in the same row to the right, and in the same column below). I end with four average values, one for each direction. A pixel only meets the threshold criterion if it is brighter than either both the left AND right averages or both the top AND bottom averages (plus some constant C
).
I compute those averages incrementally for all pixels at the same time using numpy.roll()
, but I still need to do this ksize
times. The ksize
will usually be 20-50.
Here is the code, the relevant part is really just what happens inside the for-loop:
def bilateral_adaptive_threshold(img, ksize=20, C=0, mode='floor', true_value=255, false_value=0):
mask = np.full(img.shape, false_value, dtype=np.int16)
left_thresh = np.zeros_like(img, dtype=np.float32) #Store the right-side average of each pixel here
right_thresh = np.zeros_like(img, dtype=np.float32) #Store the left-side average of each pixel here
up_thresh = np.zeros_like(img, dtype=np.float32) #Store the top-side average of each pixel here
down_thresh = np.zeros_like(img, dtype=np.float32) #Store the bottom-side average of each pixel here
for i in range(1, ksize+1):
roll_left = np.roll(img, -i, axis=1)
roll_right = np.roll(img, i, axis=1)
roll_up = np.roll(img, -i, axis=0)
roll_down = np.roll(img, i, axis=0)
roll_left[:,-i:] = 0
roll_right[:,:i] = 0
roll_up[-i:,:] = 0
roll_down[:i,:] = 0
left_thresh += roll_right
right_thresh += roll_left
up_thresh += roll_down
down_thresh += roll_up
left_thresh /= ksize
right_thresh /= ksize
up_thresh /= ksize
down_thresh /= ksize
if mode == 'floor':
mask[((img > left_thresh+C) & (img > right_thresh+C)) | ((img > up_thresh+C) & (img > down_thresh+C))] = true_value
elif mode == 'ceil':
mask[((img < left_thresh-C) & (img < right_thresh-C)) | ((img < up_thresh-C) & (img < down_thresh-C))] = true_value
else: raise ValueError("Unexpected mode value. Expected value is 'floor' or 'ceil'.")
return mask
Adaptive thresholding is the method where the threshold value is calculated for smaller regions and therefore, there will be different threshold values for different regions. In OpenCV, you can perform Adaptive threshold operation on an image using the method adaptiveThreshold() of the Imgproc class.
threshold is used to apply the thresholding. The first argument is the source image, which should be a grayscale image. The second argument is the threshold value which is used to classify the pixel values. The third argument is the maximum value which is assigned to pixel values exceeding the threshold.
Wand threshold() function – Python The threshold() function is an inbuilt function in the Python Wand ImageMagick library which is used to modify the image such that any pixel's intensity value greater than the threshold is assigned the maximum intensity (white), or otherwise is assigned the minimum intensity (black).
We use the cv2. THRESH_BINARY_INV method, which indicates that pixel values p less than T are set to the output value (the third argument). The cv2. threshold function then returns a tuple of 2 values: the first, T, is the threshold value.
As you hint in your question, the dominant part of the function is obtaining the 4 arrays of sums needed to calculate the averages -- here it's on average 190ms out of 210ms for the whole function. So, let's focus on that.
First, necessary imports and a convenience timing function.
from timeit import default_timer as timer
import numpy as np
import cv2
## ===========================================================================
def time_fn(fn, img, ksize=20, iters=16):
start = timer()
for i in range(iters):
fn(img, ksize)
end = timer()
return ((end - start) / iters) * 1000
## ===========================================================================
# Our test image
img = np.uint8(np.random.random((720,1280)) * 256)
We can reduce your function in the following manner, so that it just calculates and returns the 4 arrays of sums. We can later use this to check that the optimized versions return the same result.
# Original code
def windowed_sum_v1(img, ksize=20):
left_thresh = np.zeros_like(img, dtype=np.float32)
right_thresh = np.zeros_like(img, dtype=np.float32)
up_thresh = np.zeros_like(img, dtype=np.float32)
down_thresh = np.zeros_like(img, dtype=np.float32)
for i in range(1, ksize+1):
roll_left = np.roll(img, -i, axis=1)
roll_right = np.roll(img, i, axis=1)
roll_up = np.roll(img, -i, axis=0)
roll_down = np.roll(img, i, axis=0)
roll_left[:,-i:] = 0
roll_right[:,:i] = 0
roll_up[-i:,:] = 0
roll_down[:i,:] = 0
left_thresh += roll_right
right_thresh += roll_left
up_thresh += roll_down
down_thresh += roll_up
return (left_thresh, right_thresh, up_thresh, down_thresh)
Now we can find how much time this function takes on local machine:
>>> print "V1: %f ms" % time_fn(windowed_sum_v1, img, 20, 16)
V1: 188.572077 ms
numpy.roll
is bound to involve some overhead, but there's no need to dig into that here. Notice that after your roll the array, you zero out the rows or columns that spilled across the edge of the array. Then you add this to the accumulator. Adding a zero doesn't change the result, so we may as well avoid that. Instead, we can just add progressive smaller and appropriately offset slices of the whole array, avoiding the roll
and (somewhat) reducing the total number of additions needed.
# Summing up ROIs
def windowed_sum_v2(img, ksize=20):
h,w=(img.shape[0], img.shape[1])
left_thresh = np.zeros_like(img, dtype=np.float32)
right_thresh = np.zeros_like(img, dtype=np.float32)
up_thresh = np.zeros_like(img, dtype=np.float32)
down_thresh = np.zeros_like(img, dtype=np.float32)
for i in range(1, ksize+1):
left_thresh[:,i:] += img[:,:w-i]
right_thresh[:,:w-i] += img[:,i:]
up_thresh[i:,:] += img[:h-i,:]
down_thresh[:h-i,:] += img[i:,:]
return (left_thresh, right_thresh, up_thresh, down_thresh)
Let's test this and time it:
>>> print "Results equal (V1 vs V2): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v2(img)))
Results equal (V1 vs V2): True
>>> print "V2: %f ms" % time_fn(windowed_sum_v2, img, 20, 16)
V2: 110.861794 ms
This implementation takes only 60% of the time of taken by the original. Can we do better?
We still have a loop there. It would be nice if we could replace the repeated additions by a single call to some optimized function. One such function is cv2.filter2D
, which calculates the following:
We can create a kernel, such that the points we want to add have a weight of 1.0
and the point that the kernel is anchored on has a weight of 0.0
.
For example, when ksize=8
, we could use the following kernels and anchor positions.
The function would then be as follows:
# Using filter2d
def windowed_sum_v3(img, ksize=20):
kernel_l = np.array([[1.0] * (ksize) + [0.0]])
kernel_r = np.array([[0.0] + [1.0] * (ksize)])
kernel_u = np.array([[1.0]] * (ksize) + [[0.0]])
kernel_d = np.array([[0.0]] + [[1.0]] * (ksize))
left_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_l, anchor=(ksize,0), borderType=cv2.BORDER_CONSTANT)
right_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_r, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)
up_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_u, anchor=(0,ksize), borderType=cv2.BORDER_CONSTANT)
down_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_d, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)
return (left_thresh, right_thresh, up_thresh, down_thresh)
Again, let's test time this function:
>>> print "Results equal (V1 vs V3): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v3(img)))
Results equal (V1 vs V3): True
>>> print "V2: %f ms" % time_fn(windowed_sum_v3, img, 20, 16)
V3: 46.652996 ms
We're down to 25% of the original time.
We're working in floating point, but right now we don't do any divisions, and the kernel contains only ones and zeros. That means we could potentially work with integers. You mention the maximum window size is 50, which means we're safe with 16 bit signed integers. Integer math tends to be faster, and if the code we use is properly vectorized, we can potentially process twice at once. Let's give this a shot, and let's also provide a wrapper that returns the result in floating point format as the previous versions.
# Integer only
def windowed_sum_v4(img, ksize=20):
kernel_l = np.array([[1] * (ksize) + [0]], dtype=np.int16)
kernel_r = np.array([[0] + [1] * (ksize)], dtype=np.int16)
kernel_u = np.array([[1]] * (ksize) + [[0]], dtype=np.int16)
kernel_d = np.array([[0]] + [[1]] * (ksize), dtype=np.int16)
left_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_l, anchor=(ksize,0), borderType=cv2.BORDER_CONSTANT)
right_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_r, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)
up_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_u, anchor=(0,ksize), borderType=cv2.BORDER_CONSTANT)
down_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_d, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)
return (left_thresh, right_thresh, up_thresh, down_thresh)
# Integer only, but returning floats
def windowed_sum_v5(img, ksize=20):
result = windowed_sum_v4(img, ksize)
return map(np.float32,result)
Let's test this out.
>>> print "Results equal (V1 vs V4): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v4(img)))
Results equal (V1 vs V4): True
>>> print "Results equal (V1 vs V5): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v5(img)))
Results equal (V1 vs V5): True
>>> print "V4: %f ms" % time_fn(windowed_sum_v4, img, 20, 16)
V4: 14.712223 ms
>>> print "V5: %f ms" % time_fn(windowed_sum_v5, img, 20, 16)
V5: 20.859744 ms
We're down to 7% if we're fine with 16 bit integers, or to 10% if we want floats.
Let's get back to the full thresholding function you wrote. Instead of dividing the sums as a separate step to obtain the average, we can scale the kernels such that filter2D
returns the mean directly. This is only a small improvement (~3%).
Similarly, you can replace the addition or subtraction of C
, by providing an appropriate delta
for the filter2D
call. This again shaves off a few percent.
N.B.: You might encounter few differences arising due to the limits of floating point representation if you implement the two above-mentioned changes.
Another possibility is to make the comparisons required to determine the mask to be comparisons of matrix vs scalar:
input < threshold
input - input < threshold - input
0 < threshold - input
0 < adjusted_threshold # determined using adjusted kernel
We can achieve this by modifying the kernels to subtract the value of anchor pixel scaled by an appropriate weight (ksize
). With numpy, this seems to make only tiny difference, although the way I understand it, we could potentially save half the reads in that part of the algorithm (while filter2D
presumably still reads and multiplies the corresponding values, even if the weight is 0).
Taking all that in consideration, we can rewrite your function like this, and get the same result in ~12.5% time as the original:
def bilateral_adaptive_threshold5(img, ksize=20, C=0, mode='floor', true_value=255, false_value=0):
mask = np.full(img.shape, false_value, dtype=np.uint8)
kernel_l = np.array([[1] * (ksize) + [-ksize]], dtype=np.int16)
kernel_r = np.array([[-ksize] + [1] * (ksize)], dtype=np.int16)
kernel_u = np.array([[1]] * (ksize) + [[-ksize]], dtype=np.int16)
kernel_d = np.array([[-ksize]] + [[1]] * (ksize), dtype=np.int16)
if mode == 'floor':
delta = C * ksize
elif mode == 'ceil':
delta = -C * ksize
else: raise ValueError("Unexpected mode value. Expected value is 'floor' or 'ceil'.")
left_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_l, anchor=(ksize,0), delta=delta, borderType=cv2.BORDER_CONSTANT)
right_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_r, anchor=(0,0), delta=delta, borderType=cv2.BORDER_CONSTANT)
up_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_u, anchor=(0,ksize), delta=delta, borderType=cv2.BORDER_CONSTANT)
down_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_d, anchor=(0,0), delta=delta, borderType=cv2.BORDER_CONSTANT)
if mode == 'floor':
mask[((0 > left_thresh) & (0 > right_thresh)) | ((0 > up_thresh) & (0 > down_thresh))] = true_value
elif mode == 'ceil':
mask[((0 < left_thresh) & (0 < right_thresh)) | ((0 < up_thresh) & (0 < down_thresh))] = true_value
return mask
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With