I have an exponential moving average that gets called millions of times, and thus is the most expensive part of my code:
double _exponential(double price[ ], double smoothingValue, int dataSetSize)
{
int i;
double cXAvg;
cXAvg = price[ dataSetSize - 2 ] ;
for (i= dataSetSize - 2; i > -1; --i)
cXAvg += (smoothingValue * (price[ i ] - cXAvg)) ;
return ( cXAvg) ;
}
Is there a more efficient way to code this to speed things up? I have a multi-threaded app and am using Visual C++.
Thank you.
Ouch!
Sure, multithreading can help. But you can almost assuredly improve the performance on a single threaded machine.
First, you are calculating it in the wrong direction. Only the most modern machines can do negative stride prefetching. Nearly all machihnes are faster for unit strides. I.e. changing the direction of the array so that you scan from low to high rather than high to low is almost always better.
Next, rewriting a bit - please allow me to shorten the variable names to make it easier to type:
avg = price[0]
for i
avg = s * (price[i] - avg)
By the way, I will start using shorthands p for price and s for smoothing, to save typing. I'm lazy...
avg0 = p0
avg1 = s*(p1-p0)
avg2 = s*(p2-s*(p1-p0)) = s*(p2-s*(p1-avg0))
avg3 = s*(p3-s*(p2-s*(p1-p0))) = s*p3 - s*s*p2 + s*s*avg1
and, in general
avg[i] = s*p[i] - s*s*p[i-1] + s*s*avg[i-2]
precalculating s*s
you might do
avg[i] = s*p[i] - s*s*(p[i-1] + s*s*avg[i-2])
but it is probably faster to do
avg[i] = (s*p[i] - s*s*p[i-1]) + s*s*avg[i-2])
The latency between avg[i] and avg[i-2] is then 1 multiply and an add, rather than a subtract and a multiply between avg[i] and avg[i-1]. I.e. more than twice as fast.
In general, you want to rewrite the recurrence so that avg[i] is calculated in terms of avg[j]
for j as far back as you can possibly go, without filling up the machine, either execution units or registers.
You are basically doing more multiplies overall, in order to get fewer chains of multiples (and subtracts) on the critical path.
Skipping from avg[i-2] to avg[i[ is easy, you can probably do three and four. Exactly how far
depends on what your machine is, and how many registers you have.
And the latency of the floating point adder and multiplier. Or, better yet, the flavour of combined multiply-add instruction you have - all modern machines have them. E.g. if the MADD or MSUB is 7 cycles long, you can do up to 6 other calculations in its shadow, even if you have only a single floating point unit. Fully pipelined. And so on. Less if pipelined every otherr cycle, as is common for double precision on older chips and GPUs. The assembly code should be software pipelined so that different loop iterations overlap. A good compiler should do that for you, but you might have to rewrite the C code to get the best performance.
By the way: I do NOT mean to suggest that you should be creating an array of avg[]. Instead, you would need two averages if avg[i] is calculated in terms of avg[i-2], and so on. You can use an array of avg[i] if you want, but I think that you only need to have 2 or 4 avgs, called, creatively, avg0 and avg1 (2, 3...), and "rotate" them.
avg0 = p0
avg1 = s*(p1-p0)
/*avg2=reuses*/avg0 = s*(p2-s*(p1-avg0))
/*avg3=reusing*/avg3 = s*p3 - s*s*p2 + s*s*avg1
for i from 2 to N by 2 do
avg0 = s*p3 - s*s*p2 + s*s*avg0
avg1 = s*p3 - s*s*p2 + s*s*avg1
This sort of trick, splitting an accumulator or average into two or more, combining multiple stages of the recurrence, is common in high performance code.
Oh, yes: precalculate s*s, etc.
If I have done it right, in infinite precision this would be identical. (Double check me, please.)
However, in finite precision FP your results may differ, hopefully only slightly, because of different roundings. If the unrolling is correct and the answers are significantly different, you probably have a numerically unstable algorithm. You're the one who wouyld know.
Note: floating point rounding errors will change the low bits of your answer. Both because of rearranging the code, and using MADD. I think that is probably okay, but you have to decide.
Note: the calculations for avg[i] and avg[i-1] are now independent. So you can use a SIMD instruction set, like Intel SSE2, which permits operation on two 64 bit values in a 128 bit wide register at a time. That'll be good for almost 2X, on a machine that has enough ALUs.
If you have enough registers to rewrite avg[i] in terms of avg[i-4] (and I am sure you do on iA64), then you can go 4X wide, if you have access to a machine like 256 bit AVX.
On a GPU... you can go for deeper recurrences, rewriting avg[i] in terms of avg[i-8], and so on.
Some GPUs have instructions that calculate AX+B or even AX+BY as a single instruction. Although that's more common for 32 bit than for 64 bit precision.
At some point I would probably start asking: do you want to do this on multiple prices at a time? Not only does this help you with multithreading, it will also suit it to running on a GPU. And using wide SIMD.
Minor Late Addition
I am a bit embarassed not to have applied Horner's Rule to expressions like
avg1 = s*p3 - s*s*p2 + s*s*avg1
giving
avg1 = s*(p3 - s*(p2 + avg1))
slightly more efficient. slightly different results with rounding.
In my defence, any decent compiler should do this for you.
But Hrner's rule makes the dependency chain deeper in terms of multiplies. You might need to unroll and pipelined the loop a few more times. Or you can do
avg1 = s*p3 - s2*(*p2 + avg1)
where you precalculate
s2 = s*s
Here is the most efficient way, albeit in C#, you would need to port it over to C++, which should be very simple. It calculates the EMA and Slope on the fly in the most efficient way.
public class ExponentialMovingAverageIndicator
{
private bool _isInitialized;
private readonly int _lookback;
private readonly double _weightingMultiplier;
private double _previousAverage;
public double Average { get; private set; }
public double Slope { get; private set; }
public ExponentialMovingAverageIndicator(int lookback)
{
_lookback = lookback;
_weightingMultiplier = 2.0/(lookback + 1);
}
public void AddDataPoint(double dataPoint)
{
if (!_isInitialized)
{
Average = dataPoint;
Slope = 0;
_previousAverage = Average;
_isInitialized = true;
return;
}
Average = ((dataPoint - _previousAverage)*_weightingMultiplier) + _previousAverage;
Slope = Average - _previousAverage;
//update previous average
_previousAverage = Average;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With