Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Optimized way to find M largest elements in an NxN array using C++

I need a blazing fast way to find the 2D positions and values of the M largest elements in an NxN array.

right now I'm doing this:

struct SourcePoint {
    Point point;
    float value;
}

SourcePoint* maxValues = new SourcePoint[ M ];

maxCoefficients = new SourcePoint*[
for (int j = 0; j < rows; j++) {
    for (int i = 0; i < cols; i++) {
        float sample = arr[i][j];
        if (sample > maxValues[0].value) {
            int q = 1;
            while ( sample > maxValues[q].value && q < M ) {
                maxValues[q-1] = maxValues[q];      // shuffle the values back
                 q++;
            }
            maxValues[q-1].value = sample;
            maxValues[q-1].point = Point(i,j);
        }
    }
}

A Point struct is just two ints - x and y.

This code basically does an insertion sort of the values coming in. maxValues[0] always contains the SourcePoint with the lowest value that still keeps it within the top M values encoutered so far. This gives us a quick and easy bailout if sample <= maxValues, we don't do anything. The issue I'm having is the shuffling every time a new better value is found. It works its way all the way down maxValues until it finds it's spot, shuffling all the elements in maxValues to make room for itself.

I'm getting to the point where I'm ready to look into SIMD solutions, or cache optimisations, since it looks like there's a fair bit of cache thrashing happening. Cutting the cost of this operation down will dramatically affect the performance of my overall algorithm since this is called many many times and accounts for 60-80% of my overall cost.

I've tried using a std::vector and make_heap, but I think the overhead for creating the heap outweighed the savings of the heap operations. This is likely because M and N generally aren't large. M is typically 10-20 and N 10-30 (NxN 100 - 900). The issue is this operation is called repeatedly, and it can't be precomputed.

I just had a thought to pre-load the first M elements of maxValues which may provide some small savings. In the current algorithm, the first M elements are guaranteed to shuffle themselves all the way down just to initially fill maxValues.

Any help from optimization gurus would be much appreciated :)

like image 800
wallacer Avatar asked Aug 19 '11 19:08

wallacer


4 Answers

A few ideas you can try. In some quick tests with N=100 and M=15 I was able to get it around 25% faster in VC++ 2010 but test it yourself to see whether any of them help in your case. Some of these changes may have no or even a negative effect depending on the actual usage/data and compiler optimizations.

  • Don't allocate a new maxValues array each time unless you need to. Using a stack variable instead of dynamic allocation gets me +5%.
  • Changing g_Source[i][j] to g_Source[j][i] gains you a very little bit (not as much as I'd thought there would be).
  • Using the structure SourcePoint1 listed at the bottom gets me another few percent.
  • The biggest gain of around +15% was to replace the local variable sample with g_Source[j][i]. The compiler is likely smart enough to optimize out the multiple reads to the array which it can't do if you use a local variable.
  • Trying a simple binary search netted me a small loss of a few percent. For larger M/Ns you'd likely see a benefit.
  • If possible try to keep the source data in arr[][] sorted, even if only partially. Ideally you'd want to generate maxValues[] at the same time the source data is created.
  • Look at how the data is created/stored/organized may give you patterns or information to reduce the amount of time to generate your maxValues[] array. For example, in the best case you could come up with a formula that gives you the top M coordinates without needing to iterate and sort.

Code for above:

struct SourcePoint1 {
     int x;
     int y;
     float value;
     int test;       //Play with manual/compiler padding if needed
};
like image 51
uesp Avatar answered Nov 14 '22 06:11

uesp


If you want to go into micro-optimizations at this point, the a simple first step should be to get rid of the Points and just stuff both dimensions into a single int. That reduces the amount of data you need to shift around, and gets SourcePoint down to being a power of two long, which simplifies indexing into it.

Also, are you sure that keeping the list sorted is better than simply recomputing which element is the new lowest after each time you shift the old lowest out?

like image 41
hmakholm left over Monica Avatar answered Nov 14 '22 07:11

hmakholm left over Monica


(Updated 22:37 UTC 2011-08-20)

I propose a binary min-heap of fixed size holding the M largest elements (but still in min-heap order!). It probably won't be faster in practice, as I think OPs insertion sort probably has decent real world performance (at least when the recommendations of the other posteres in this thread are taken into account).

Look-up in the case of failure should be constant time: If the current element is less than the minimum element of the heap (containing the max M elements) we can reject it outright.

If it turns out that we have an element bigger than the current minimum of the heap (the Mth biggest element) we extract (discard) the previous min and insert the new element.

If the elements are needed in sorted order the heap can be sorted afterwards.

First attempt at a minimal C++ implementation:

template<unsigned size, typename T>
class m_heap {
private: 
    T nodes[size];
    static const unsigned last = size - 1;

    static unsigned parent(unsigned i) { return (i - 1) / 2; }
    static unsigned left(unsigned i) { return i * 2; }
    static unsigned right(unsigned i) { return i * 2 + 1; }

    void bubble_down(unsigned int i) {
        for (;;) { 
            unsigned j = i;
            if (left(i) < size && nodes[left(i)] < nodes[i])
                j = left(i);
            if (right(i) < size && nodes[right(i)] < nodes[j])
                j = right(i);
            if (i != j) {
                swap(nodes[i], nodes[j]);
                i = j;
            } else {
                break;
            }
        }
    }

    void bubble_up(unsigned i) {
        while (i > 0 && nodes[i] < nodes[parent(i)]) {
            swap(nodes[parent(i)], nodes[i]);
            i = parent(i);
        }
    }

public:
    m_heap() {
        for (unsigned i = 0; i < size; i++) {
            nodes[i] = numeric_limits<T>::min();
        }
    }

    void add(const T& x) {
        if (x < nodes[0]) {
            // reject outright 
            return;
        }
        nodes[0] = x;
        swap(nodes[0], nodes[last]);
        bubble_down(0);
    }
};

Small test/usage case:

#include <iostream>
#include <limits>
#include <algorithm>
#include <vector>
#include <stdlib.h>
#include <assert.h>
#include <math.h>

using namespace std;

// INCLUDE TEMPLATED CLASS FROM ABOVE

typedef vector<float> vf;
bool compare(float a, float b) { return a > b; }

int main()
{
    int N = 2000;
    vf v;
    for (int i = 0; i < N; i++) v.push_back( rand()*1e6 / RAND_MAX);

    static const int M = 50;
    m_heap<M, float> h;
    for (int i = 0; i < N; i++) h.add( v[i] );

    sort(v.begin(), v.end(), compare);

    vf heap(h.get(), h.get() + M); // assume public in m_heap: T* get() { return nodes; }
    sort(heap.begin(), heap.end(), compare);

    cout << "Real\tFake" << endl;
    for (int i = 0; i < M; i++) {
        cout << v[i] << "\t" << heap[i] << endl;
        if (fabs(v[i] - heap[i]) > 1e-5) abort();
    }
}
like image 4
user786653 Avatar answered Nov 14 '22 06:11

user786653


You're looking for a priority queue:

template < class T, class Container = vector<T>,
       class Compare = less<typename Container::value_type> > 
       class priority_queue;

You'll need to figure out the best underlying container to use, and probably define a Compare function to deal with your Point type.

If you want to optimize it, you could run a queue on each row of your matrix in its own worker thread, then run an algorithm to pick the largest item of the queue fronts until you have your M elements.

like image 2
Mike DeSimone Avatar answered Nov 14 '22 06:11

Mike DeSimone