Executive Summary: Reed's answer below is the fastest if you want to stay in C#. If you're willing to marshal to C++ (which I am), that's a faster solution.
I have two 55mb ushort arrays in C#. I am combining them using the following loop:
float b = (float)number / 100.0f;
for (int i = 0; i < length; i++)
{
image.DataArray[i] =
(ushort)(mUIHandler.image1.DataArray[i] +
(ushort)(b * (float)mUIHandler.image2.DataArray[i]));
}
This code, according to adding DateTime.Now calls before and afterwards, takes 3.5 seconds to run. How can I make it faster?
EDIT: Here is some code that, I think, shows the root of the problem. When the following code is run in a brand new WPF application, I get these timing results:
Time elapsed: 00:00:00.4749156 //arrays added directly
Time elapsed: 00:00:00.5907879 //arrays contained in another class
Time elapsed: 00:00:02.8856150 //arrays accessed via accessor methods
So when arrays are walked directly, the time is much faster than if the arrays are inside of another object or container. This code shows that somehow, I'm using an accessor method, rather than accessing the arrays directly. Even so, the fastest I seem to be able to get is half a second. When I run the second listing of code in C++ with icc, I get:
Run time for pointer walk: 0.0743338
In this case, then, C++ is 7x faster (using icc, not sure if the same performance can be obtained with msvc-- I'm not as familiar with optimizations there). Is there any way to get C# near that level of C++ performance, or should I just have C# call my C++ routine?
Listing 1, C# code:
public class ArrayHolder
{
int length;
public ushort[] output;
public ushort[] input1;
public ushort[] input2;
public ArrayHolder(int inLength)
{
length = inLength;
output = new ushort[length];
input1 = new ushort[length];
input2 = new ushort[length];
}
public ushort[] getOutput() { return output; }
public ushort[] getInput1() { return input1; }
public ushort[] getInput2() { return input2; }
}
/// <summary>
/// Interaction logic for MainWindow.xaml
/// </summary>
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
Random random = new Random();
int length = 55 * 1024 * 1024;
ushort[] output = new ushort[length];
ushort[] input1 = new ushort[length];
ushort[] input2 = new ushort[length];
ArrayHolder theArrayHolder = new ArrayHolder(length);
for (int i = 0; i < length; i++)
{
output[i] = (ushort)random.Next(0, 16384);
input1[i] = (ushort)random.Next(0, 16384);
input2[i] = (ushort)random.Next(0, 16384);
theArrayHolder.getOutput()[i] = output[i];
theArrayHolder.getInput1()[i] = input1[i];
theArrayHolder.getInput2()[i] = input2[i];
}
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
int number = 44;
float b = (float)number / 100.0f;
for (int i = 0; i < length; i++)
{
output[i] =
(ushort)(input1[i] +
(ushort)(b * (float)input2[i]));
}
stopwatch.Stop();
Console.WriteLine("Time elapsed: {0}",
stopwatch.Elapsed);
stopwatch.Reset();
stopwatch.Start();
for (int i = 0; i < length; i++)
{
theArrayHolder.output[i] =
(ushort)(theArrayHolder.input1[i] +
(ushort)(b * (float)theArrayHolder.input2[i]));
}
stopwatch.Stop();
Console.WriteLine("Time elapsed: {0}",
stopwatch.Elapsed);
stopwatch.Reset();
stopwatch.Start();
for (int i = 0; i < length; i++)
{
theArrayHolder.getOutput()[i] =
(ushort)(theArrayHolder.getInput1()[i] +
(ushort)(b * (float)theArrayHolder.getInput2()[i]));
}
stopwatch.Stop();
Console.WriteLine("Time elapsed: {0}",
stopwatch.Elapsed);
}
}
Listing 2, C++ equivalent: // looptiming.cpp : Defines the entry point for the console application. //
#include "stdafx.h"
#include <stdlib.h>
#include <windows.h>
#include <stdio.h>
#include <iostream>
int _tmain(int argc, _TCHAR* argv[])
{
int length = 55*1024*1024;
unsigned short* output = new unsigned short[length];
unsigned short* input1 = new unsigned short[length];
unsigned short* input2 = new unsigned short[length];
unsigned short* outPtr = output;
unsigned short* in1Ptr = input1;
unsigned short* in2Ptr = input2;
int i;
const int max = 16384;
for (i = 0; i < length; ++i, ++outPtr, ++in1Ptr, ++in2Ptr){
*outPtr = rand()%max;
*in1Ptr = rand()%max;
*in2Ptr = rand()%max;
}
LARGE_INTEGER ticksPerSecond;
LARGE_INTEGER tick1, tick2; // A point in time
LARGE_INTEGER time; // For converting tick into real time
QueryPerformanceCounter(&tick1);
outPtr = output;
in1Ptr = input1;
in2Ptr = input2;
int number = 44;
float b = (float)number/100.0f;
for (i = 0; i < length; ++i, ++outPtr, ++in1Ptr, ++in2Ptr){
*outPtr = *in1Ptr + (unsigned short)((float)*in2Ptr * b);
}
QueryPerformanceCounter(&tick2);
QueryPerformanceFrequency(&ticksPerSecond);
time.QuadPart = tick2.QuadPart - tick1.QuadPart;
std::cout << "Run time for pointer walk: " << (double)time.QuadPart/(double)ticksPerSecond.QuadPart << std::endl;
return 0;
}
EDIT 2: Enabling /QxHost in the second example drops the time down to 0.0662714 seconds. Modifying the first loop as @Reed suggested gets me down to
Time elapsed: 00:00:00.3835017
So, still not fast enough for a slider. That time is via the code:
stopwatch.Start();
Parallel.ForEach(Partitioner.Create(0, length),
(range) =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
output[i] =
(ushort)(input1[i] +
(ushort)(b * (float)input2[i]));
}
});
stopwatch.Stop();
EDIT 3 As per @Eric Lippert's suggestion, I've rerun the code in C# in release, and, rather than use an attached debugger, just print the results to a dialog. They are:
(these numbers come from a 5 run average)
So the parallel solution is definitely faster than the 3.5 seconds I was getting before, but is still a bit under the 0.074 seconds achievable using the non-icc processor. It seems, therefore, that the fastest solution is to compile in release and then marshal to an icc-compiled C++ executable, which makes using a slider possible here.
EDIT 4: Three more suggestions from @Eric Lippert: change the inside of the for loop from length to array.length, use doubles, and try unsafe code.
For those three, the timing is now:
So far, the parallel solution is the big winner. Although if I could add these via a shader, maybe I could see some kind of speedup there...
Here's the additional code:
stopwatch.Reset();
stopwatch.Start();
double b2 = ((double)number) / 100.0;
for (int i = 0; i < output.Length; ++i)
{
output[i] =
(ushort)(input1[i] +
(ushort)(b2 * (double)input2[i]));
}
stopwatch.Stop();
DoubleArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
stopwatch.Reset();
stopwatch.Start();
for (int i = 0; i < output.Length; ++i)
{
output[i] =
(ushort)(input1[i] +
(ushort)(b * input2[i]));
}
stopwatch.Stop();
LengthArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
Console.WriteLine("Time elapsed: {0}",
stopwatch.Elapsed);
stopwatch.Reset();
stopwatch.Start();
unsafe
{
fixed (ushort* outPtr = output, in1Ptr = input1, in2Ptr = input2){
ushort* outP = outPtr;
ushort* in1P = in1Ptr;
ushort* in2P = in2Ptr;
for (int i = 0; i < output.Length; ++i, ++outP, ++in1P, ++in2P)
{
*outP = (ushort)(*in1P + b * (float)*in2P);
}
}
}
stopwatch.Stop();
UnsafeArrayLabel.Content += "\t" + stopwatch.Elapsed.Seconds + "." + stopwatch.Elapsed.Milliseconds;
Console.WriteLine("Time elapsed: {0}",
stopwatch.Elapsed);
Makefile is a set of commands (similar to terminal commands) with variable names and targets to create object file and to remove them. In a single make file we can create multiple targets to compile and to remove object, binary files. You can compile your project (program) any number of times by using Makefile.
The file name of the target of the rule. If the target is an archive member, then ' $@ ' is the name of the archive file. In a pattern rule that has multiple targets (see Introduction to Pattern Rules), ' $@ ' is the name of whichever target caused the rule's recipe to be run.
The make utility requires a file, Makefile (or makefile ), which defines set of tasks to be executed. You may have used make to compile a program from source code. Most open source projects use make to compile a final executable binary, which can then be installed using make install .
This should be perfectly parallelizable. However, given the small amount of work being done per element, you'll need to handle this with extra care.
The proper way to do this (in .NET 4) would be to use Parallel.ForEach
in conjunction with a Partitioner:
float b = (float)number / 100.0f;
Parallel.ForEach(Partitioner.Create(0, length),
(range) =>
{
for (int i = range.Item1; i < range.Item2; i++)
{
image.DataArray[i] =
(ushort)(mUIHandler.image1.DataArray[i] +
(ushort)(b * (float)mUIHandler.image2.DataArray[i]));
}
});
This will efficiently partition the work across available processing cores in your system, and should provide a decent speedup if you have multiple cores.
That being said, this will, at best, only speed up this operation by the number of cores in your system. If you need to speed it up more, you'll likely need to revert to a mix of parallelization and unsafe code. At that point, it might be worth thinking about alternatives to trying to present this in real time.
Assuming you have a lot of these guys, you can attempt to parallelize the operation (and you're using .NET 4):
Parallel.For(0, length, i=>
{
image.DataArray[i] =
(ushort)(mUIHandler.image1.DataArray[i] +
(ushort)(b * (float)mUIHandler.image2.DataArray[i]));
});
Of course that is all going to depend on whether or not parallelization of this would be worth it. That statement looks fairly computationally short; accessing indices by number is pretty fast as is. You might get gains because this loop is being run so many times with that much data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With