I need to quickly subtract each value in ushort arrayA from the corresponding index value in ushort arrayB which has an identical length.
In addition, if the difference is negative, I need to store a zero, not the negative difference.
(Length = 327680 to be exact, since I'm subtracting a 640x512 image from another image of identical size).
The code below is currently taking ~20ms and I'd like to get it down under ~5ms if possible. Unsafe code is ok, but please provide an example, as I'm not super-skilled at writing unsafe code.
Thank you!
public ushort[] Buffer { get; set; }
public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
sw.Start();
int bufferLength = Buffer.Length;
for (int index = 0; index < bufferLength; index++)
{
int difference = Buffer[index] - backgroundBuffer[index];
if (difference >= 0)
Buffer[index] = (ushort)difference;
else
Buffer[index] = 0;
}
Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
}
UPDATE: While it's not strictly C#, for the benefit of others who read this, I finally ended up adding a C++ CLR Class Library to my solution with the following code. It runs in ~3.1ms. If an unmanaged C++ library is used, it runs in ~2.2ms. I decided to go with the managed library since the time difference was small.
// SpeedCode.h
#pragma once
using namespace System;
namespace SpeedCode
{
public ref class SpeedClass
{
public:
static void SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength);
};
}
// SpeedCode.cpp
// This is the main DLL file.
#include "stdafx.h"
#include "SpeedCode.h"
namespace SpeedCode
{
void SpeedClass::SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength)
{
for (int index = 0; index < bufferLength; index++)
{
buffer[index] = (UInt16)((buffer[index] - backgroundBuffer[index]) * (buffer[index] > backgroundBuffer[index]));
}
}
}
Then I call it like this:
public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
sw.Start();
SpeedCode.SpeedClass.SpeedSubtractBackgroundFromBuffer(Buffer, backgroundBuffer, Buffer.Length);
Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
}
Subtract Two ArraysCreate two arrays, A and B , and subtract the second, B , from the first, A . The elements of B are subtracted from the corresponding elements of A . Use the syntax -C to negate the elements of C .
Addition and subtractionAdding and subtracting two arrays is the same as for matrices. The operation is valid if both arrays have the same size, and the addition or subtraction is done coefficient-wise.
Some benchmarks.
SubtractBackgroundFromBuffer:
this is the original method from the question.SubtractBackgroundFromBufferWithCalcOpt:
this is the original method augmented with TTat's idea for improving the calculation speed.SubtractBackgroundFromBufferParallelFor:
the solution from Selman22's answer.SubtractBackgroundFromBufferBlockParallelFor:
my answer. Similar to 3., but breaks the processing up into blocks of 4096 values.SubtractBackgroundFromBufferPartitionedParallelForEach:
Geoff's first answer.SubtractBackgroundFromBufferPartitionedParallelForEachHack:
Geoff's second answer.Updates
Interestingly, I can get a small speed increase (~6%) for SubtractBackgroundFromBufferBlockParallelFor
by using (as suggested by Bruno Costa)
Buffer[i] = (ushort)Math.Max(difference, 0);
instead of
if (difference >= 0)
Buffer[i] = (ushort)difference;
else
Buffer[i] = 0;
Results
Note that this is the total time for the 1000 iterations in each run.
SubtractBackgroundFromBuffer(ms): 2,062.23
SubtractBackgroundFromBufferWithCalcOpt(ms): 2,245.42
SubtractBackgroundFromBufferParallelFor(ms): 4,021.58
SubtractBackgroundFromBufferBlockParallelFor(ms): 769.74
SubtractBackgroundFromBufferPartitionedParallelForEach(ms): 827.48
SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms): 539.60
So it seems from those results that the best approach combines the calculation optimizations for a small gain and the makes use of Parallel.For
to operate on chunks of the image. Your mileage will of course vary, and performance of parallel code is sensitive to the CPU you are running.
Test Harness
I ran this for each method in Release mode. I am starting and stopping the Stopwatch
this way to ensure only processing time is measured.
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
ushort[] bgImg = GenerateRandomBuffer(327680, 818687447);
for (int i = 0; i < 1000; i++)
{
Buffer = GenerateRandomBuffer(327680, 128011992);
sw.Start();
SubtractBackgroundFromBuffer(bgImg);
sw.Stop();
}
Console.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
public static ushort[] GenerateRandomBuffer(int size, int randomSeed)
{
ushort[] buffer = new ushort[327680];
Random random = new Random(randomSeed);
for (int i = 0; i < size; i++)
{
buffer[i] = (ushort)random.Next(ushort.MinValue, ushort.MaxValue);
}
return buffer;
}
The Methods
public static void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
int bufferLength = Buffer.Length;
for (int index = 0; index < bufferLength; index++)
{
int difference = Buffer[index] - backgroundBuffer[index];
if (difference >= 0)
Buffer[index] = (ushort)difference;
else
Buffer[index] = 0;
}
}
public static void SubtractBackgroundFromBufferWithCalcOpt(ushort[] backgroundBuffer)
{
int bufferLength = Buffer.Length;
for (int index = 0; index < bufferLength; index++)
{
if (Buffer[index] < backgroundBuffer[index])
{
Buffer[index] = 0;
}
else
{
Buffer[index] -= backgroundBuffer[index];
}
}
}
public static void SubtractBackgroundFromBufferParallelFor(ushort[] backgroundBuffer)
{
Parallel.For(0, Buffer.Length, (i) =>
{
int difference = Buffer[i] - backgroundBuffer[i];
if (difference >= 0)
Buffer[i] = (ushort)difference;
else
Buffer[i] = 0;
});
}
public static void SubtractBackgroundFromBufferBlockParallelFor(ushort[] backgroundBuffer)
{
int blockSize = 4096;
Parallel.For(0, (int)Math.Ceiling(Buffer.Length / (double)blockSize), (j) =>
{
for (int i = j * blockSize; i < (j + 1) * blockSize; i++)
{
int difference = Buffer[i] - backgroundBuffer[i];
Buffer[i] = (ushort)Math.Max(difference, 0);
}
});
}
public static void SubtractBackgroundFromBufferPartitionedParallelForEach(ushort[] backgroundBuffer)
{
Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
{
for (int i = range.Item1; i < range.Item2; ++i)
{
if (Buffer[i] < backgroundBuffer[i])
{
Buffer[i] = 0;
}
else
{
Buffer[i] -= backgroundBuffer[i];
}
}
});
}
public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(ushort[] backgroundBuffer)
{
Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
{
for (int i = range.Item1; i < range.Item2; ++i)
{
unsafe
{
var nonNegative = Buffer[i] > backgroundBuffer[i];
Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
*((int*)(&nonNegative)));
}
}
});
}
This is an interesting question.
Only performing the subtraction after testing that the result won't be negative (as suggested by TTat and Maximum Cookie) has negligible impact, as this optimization can already be performed by the JIT compiler.
Parallelizing the task (as suggested by Selman22) is a good idea, but when the loop is as fast as it is in this case, the overhead ends up outwaying the gains, so Selman22's implementation actually runs slower in my testing. I suspect that nick_w's benchmarks were produced with debugger attached, hiding this fact.
Parallelizing the task in larger chunks (as suggested by nick_w) deals with the overhead problem, and can actually produce faster performance, but you don't have to calculate the chunks yourself - you can use Partitioner
to do this for you:
public static void SubtractBackgroundFromBufferPartitionedParallelForEach(
ushort[] backgroundBuffer)
{
Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
{
for (int i = range.Item1; i < range.Item2; ++i)
{
if (Buffer[i] < backgroundBuffer[i])
{
Buffer[i] = 0;
}
else
{
Buffer[i] -= backgroundBuffer[i];
}
}
});
}
The above method consistently outperforms nick_w's hand-rolled chunking in my testing.
But wait! There's more to it than that.
The real culprit in slowing down your code is not the assignment or arithmetic. It's the if
statement. How it affects performance is going to be majorly impacted by the nature of the data you are processing.
nick_w's benchmarking generates random data of the same magnitude for both buffers. However, I suspect that it is very likely that you actually have lower average magnitude data in the background buffer. This detail can be significant due to branch prediction (as explained in this classic SO answer).
When the value in the background buffer is usually smaller than that in the buffer, the JIT compiler can notice this, and optimize for that branch accordingly. When the data in each buffer is from the same random population there is no way to guess the outcome of the if
statement with greater than 50% accuracy. It is this latter scenario that nick_w is benchmarking, and under those conditions we could potentially further optimize your method by using unsafe code to convert a bool to an integer and avoid branching at all. (Note that the following code is relying on an implementation detail of how bool's are represented in memory, and while it works for your scenario in .NET 4.5, it is not necessarily a good idea, and is shown here for illustrative purposes.)
public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(
ushort[] backgroundBuffer)
{
Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
{
for (int i = range.Item1; i < range.Item2; ++i)
{
unsafe
{
var nonNegative = Buffer[i] > backgroundBuffer[i];
Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
*((int*)(&nonNegative)));
}
}
});
}
If you are really looking to shave a bit more time off, then you can follow this approach in a safer manner by switching language to C++/CLI, as that will let you use a boolean in an arithmetic expression without resorting to unsafe code:
UInt16 MyCppLib::Maths::SafeSubtraction(UInt16 minuend, UInt16 subtrahend)
{
return (UInt16)((minuend - subtrahend) * (minuend > subtrahend));
}
You can create a purely managed DLL using C++/CLI exposing the above static method, and then use it in your C# code:
public static void SubtractBackgroundFromBufferPartitionedParallelForEachCpp(
ushort[] backgroundBuffer)
{
Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
{
for (int i = range.Item1; i < range.Item2; ++i)
{
Buffer[i] =
MyCppLib.Maths.SafeSubtraction(Buffer[i], backgroundBuffer[i]);
}
});
}
This outperforms the hacky unsafe C# code above. In fact, it is so fast that you could write the whole method using C++/CLI forgetting about parallelization, and it would still out-perform the other techniques.
Using nick_w's test harness, the above method will outperform any of the other suggestions published here so far. Here are the results I get (1-4 are the cases he tried, and 5-7 are the ones outlined in this answer):
1. SubtractBackgroundFromBuffer(ms): 2,021.37
2. SubtractBackgroundFromBufferWithCalcOpt(ms): 2,125.80
3. SubtractBackgroundFromBufferParallelFor(ms): 3,431.58
4. SubtractBackgroundFromBufferBlockParallelFor(ms): 1,401.36
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms): 1,197.76
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms): 742.72
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms): 499.27
However, in the scenario I expect you actually have, where background values are typically smaller, successful branch prediction improves results across the board, and the 'hack' to avoid the if
statement is actually slower:
Here are the results I get using nick_w's test harness when I restrict values in the background buffer to the range 0-6500
(c. 10% of the buffer):
1. SubtractBackgroundFromBuffer(ms): 773.50
2. SubtractBackgroundFromBufferWithCalcOpt(ms): 915.91
3. SubtractBackgroundFromBufferParallelFor(ms): 2,458.36
4. SubtractBackgroundFromBufferBlockParallelFor(ms): 663.76
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms): 658.05
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms): 762.11
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms): 494.12
You can see that results 1-5 have all dramatically improved since they are now benefiting from better branch prediction. Results 6 & 7 haven't changed much, since they have avoided branching.
This change in data has completely changes things. In this scenario, even the fastest all C# solution is now only 15% faster than your original code.
Bottom line: be sure to test any method you pick with representative data, or your results will be meaningless.
You may get a minor performance increase by checking first if the result is going to be negative before actually performing the subtraction. That way, there is no need to perform the subtraction if the result will be negative. Example:
if (Buffer[index] > backgroundBuffer[index])
Buffer[index] = (ushort)(Buffer[index] - backgroundBuffer[index]);
else
Buffer[index] = 0;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With