Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Performing intersection of vector c++

Tags:

c++

c++11

vector

I have 200 vectors of size ranging from 1 to 4000000 stored in vecOfVec. I need to intersect these vectors with a single vector "vecSearched" of size 9000+ elements. I tried to do the same using the following code, however using perf tool I found that the intersection which I am doing is the bottleneck in my code. Is there some way by which I may perform an efficient intersection

#include <cstdlib>
#include <iostream>
#include <vector>

using namespace std;

int main(int argc, char** argv) {
  vector<vector<unsigned> > vecOfVec; //contains 120 vectors of size ranging from 1 to 2000000 elements. All vectors in vecOfVec are sorted
  vector<unsigned> vecSearched; //vector searched in contains 9000+ elements. Vectors in vecSearched are sorted
  for(unsigned kbt=0; kbt<vecOfVec.size(); kbt++)
  {                                  
     //first find first 9 values spaced at equi-distant places, use these 9 values for performing comparisons
     vector<unsigned> equiSpacedVec;                             

     if(((vecSearched[0]))>vecOfVec[kbt][(vecOfVec[kbt].size())-1]) //if beginning of searched vector > last value present in individual vectors of vecOfVec then continue
     {
         continue;                             
     }                         

     unsigned elementIndex=0; //used for iterating over equiSpacedVec                             
     unsigned i=0; //used for iterating over individual buckets vecOfVec[kbt].second
     //search for value in bucket and store it in bucketValPos
     bool firstRun=true;             
     for(vector<unsigned>::iterator itValPos=vecSearched.begin();itValPos!=vecSearched.end();++itValPos)
     {
         //construct a summarized vector out of individual vectors of vecOfVec
         if(firstRun)
         {
             firstRun=false;
             unsigned elementIndex1=0; //used for iterating over equiSpacedVec
             while(elementIndex1<(vecOfVec[kbt].size())) //create a small vector for skipping over the remaining vectors
             {
                  if((elementIndex1+(10000))<(vecOfVec[kbt].size()))
                     elementIndex1+=10000;
                  else
                     break;
                 equiSpacedVec.push_back(vecOfVec[kbt][elementIndex1]);
             }          
         }
         //skip individual vectors of vecOfVec using summarized vector constructed above
         while((!(equiSpacedVec.empty()))&&(equiSpacedVec.size()>(elementIndex+1))&&((*itValPos)>equiSpacedVec[elementIndex+1])){
             elementIndex+=1;
             if((i+100)<(vecOfVec[kbt].size()))
                 i+=100;
         }            
         unsigned j=i;
         while(((*itValPos)>vecOfVec[kbt][j])&&(j<vecOfVec[kbt].size())){
             j++;
         }
         if(j>(vecOfVec[kbt].size()-1)) //element not found even at last position.
         {
             break;
         }              

         if((*itValPos)==vecOfVec[kbt][j])
         {                                     
             //store intersection result
         }
     }
  }
    return 0;
}
like image 479
Steg Verner Avatar asked Jun 26 '15 05:06

Steg Verner


2 Answers

Your problem is a very popular one. Since you have no data correlating the vectors to intersect, it boils down to speeding up the intersection between two vectors and there are basically two approaches to it :

1. Without any preprocessing

This is usually adressed by three things :

  • Recuding the number of comparisons. Eg for small vectors (sized 1 to 50) you should binary search each element to avoid traversing all the 9000+ elements of the subject vector.

  • Improving code quality to reduce branch mispredictions, eg observing that the resulting set will usually be of smaller size than the input sets could transform such code :

    while (Apos < Aend && Bpos < Bend) {
        if (A[Apos] == B[Bpos]) {
            C[Cpos++] = A[Apos];
            Apos++; Bpos++;
        }
        else if (A[Apos] > B[Bpos]) {
            Bpos++;
        } 
        else {
          Apos++;
        } 
    }
    

    to code that "unrolls" such comparisons creating though easier to predict branches (example for block size = 2) :

    while (1) {
        Adat0 = A[Apos]; Adat1 = A[Apos + 1]; 
        Bdat0 = B[Bpos]; Bdat1 = B[Bpos + 1];
        if (Adat0 == Bdat0) { 
            C[Cpos++] = Adat0;
        }
        else if (Adat0 == Bdat1) {
            C[Cpos++] = Adat0;
            goto advanceB;
        }
        else if (Adat1 == Bdat0) {
            C[Cpos++] = Adat1;
            goto advanceA;
        }
        if (Adat1 == Bdat1) {
            C[Cpos++] = Adat1;
            goto advanceAB;
        }
        else if (Adat1 > Bdat1) goto advanceB;
        else goto advanceA;
    advanceA:
        Apos+=2;
        if (Apos >= Aend) { break; } else { continue; }
    advanceB:
        Bpos+=2;
        if (Bpos >= Bend) { break; } else { continue; }
    advanceAB:
        Apos+=2;  Bpos+=2;
        if (Apos >= Aend || Bpos >= Bend) { break; }
    }
    //  fall back to naive algorithm for remaining elements
    
  • Using SIMD instructions to perform block operations

These techniques are hard to describe in a QA context but you can read about them (plus relevant optimizations like the if conversion) here and here or find implementation elements here

2. With preprocessing

This IMHO is a better way for your case because you have a single subject vector of size 9000+ elements. You could make an interval tree out of it or simply find a way to index it, eg make a structure that will speed up the search against it :

vector<unsigned> subject(9000+); 
vector<range>    index(9000/M); 

where range is a struct like

struct range {
    unsigned min, max; 
}; 

thus creating a sequence of ranges like so

[0, 100], [101, 857], ... [33221, 33500]

that will allow to skip many many comparisons when doing the intersection (for example if an element of the other set is larger than the max of a subrange, you can skip that subrange completely)

3. Parallelize

Yep, there's always a third element in a list of two :P. When you have optimized enough the procedure (and only then), do break up your work into chunks and run it in parallel. The problem fits an embarassing pattern so 200 vectors vs 1 should definetely run as "50 vs 1 four times concurrently"

Test, measure, redesign!!

like image 97
Nikos Athanasiou Avatar answered Sep 18 '22 01:09

Nikos Athanasiou


If I understood correctly your code then if N is the number of vectors and M the number of elements inside each vector then your algorithm is roughly O(N * M^2). Then there is the 'bucket' strategy that improves things a bit but its effect is difficult to evaluate at a first sight.

I would suggest you to work on sorted vectors and make intersections on sorted ones. Something like this:

vector<vector<unsigned> > vecOfVec;
vector<unsignend> vecSearched ;
for (vector<unsigned> v : vecSearched) // yes, a copy
{
   std::sort(v.begin(), v.end()) ;
   if (vecSearched.empty()) // first run detection
      vSearched = v ; 
   else
   {  // compute intersection of v and vecSearch
      auto vit = v.begin() ;
      auto vend = v.end() ;
      auto sit = vecSearched.begin() ;
      auto send = vecSearched.end() ;
      vector<unsiged> result ;
      while (vit != vend && sit != send)
      {
         if (*vit < *sit)
             vit++ ;
         else if (*vit == *sit)
         {
             result.push_bck(*it) ; 
             ++vit ;
             ++sit ;
         }
         else //  *vit > *sit
            ++sit ;
      }
      vecSearched = result ;
   }
}

Code is untested, anyway the idea behind it is that intersection on sorted vectors is easier since you can compare those two iterators (vit, sit) and grow the one pointing to the smaller one. So intersecton is linear in M and whole complexity is O(N * M *log(M)), where log(M) is due to sorting

like image 24
marom Avatar answered Sep 18 '22 01:09

marom