I have 200 vectors of size ranging from 1 to 4000000 stored in vecOfVec. I need to intersect these vectors with a single vector "vecSearched" of size 9000+ elements. I tried to do the same using the following code, however using perf tool I found that the intersection which I am doing is the bottleneck in my code. Is there some way by which I may perform an efficient intersection
#include <cstdlib>
#include <iostream>
#include <vector>
using namespace std;
int main(int argc, char** argv) {
vector<vector<unsigned> > vecOfVec; //contains 120 vectors of size ranging from 1 to 2000000 elements. All vectors in vecOfVec are sorted
vector<unsigned> vecSearched; //vector searched in contains 9000+ elements. Vectors in vecSearched are sorted
for(unsigned kbt=0; kbt<vecOfVec.size(); kbt++)
{
//first find first 9 values spaced at equi-distant places, use these 9 values for performing comparisons
vector<unsigned> equiSpacedVec;
if(((vecSearched[0]))>vecOfVec[kbt][(vecOfVec[kbt].size())-1]) //if beginning of searched vector > last value present in individual vectors of vecOfVec then continue
{
continue;
}
unsigned elementIndex=0; //used for iterating over equiSpacedVec
unsigned i=0; //used for iterating over individual buckets vecOfVec[kbt].second
//search for value in bucket and store it in bucketValPos
bool firstRun=true;
for(vector<unsigned>::iterator itValPos=vecSearched.begin();itValPos!=vecSearched.end();++itValPos)
{
//construct a summarized vector out of individual vectors of vecOfVec
if(firstRun)
{
firstRun=false;
unsigned elementIndex1=0; //used for iterating over equiSpacedVec
while(elementIndex1<(vecOfVec[kbt].size())) //create a small vector for skipping over the remaining vectors
{
if((elementIndex1+(10000))<(vecOfVec[kbt].size()))
elementIndex1+=10000;
else
break;
equiSpacedVec.push_back(vecOfVec[kbt][elementIndex1]);
}
}
//skip individual vectors of vecOfVec using summarized vector constructed above
while((!(equiSpacedVec.empty()))&&(equiSpacedVec.size()>(elementIndex+1))&&((*itValPos)>equiSpacedVec[elementIndex+1])){
elementIndex+=1;
if((i+100)<(vecOfVec[kbt].size()))
i+=100;
}
unsigned j=i;
while(((*itValPos)>vecOfVec[kbt][j])&&(j<vecOfVec[kbt].size())){
j++;
}
if(j>(vecOfVec[kbt].size()-1)) //element not found even at last position.
{
break;
}
if((*itValPos)==vecOfVec[kbt][j])
{
//store intersection result
}
}
}
return 0;
}
Your problem is a very popular one. Since you have no data correlating the vectors to intersect, it boils down to speeding up the intersection between two vectors and there are basically two approaches to it :
This is usually adressed by three things :
Recuding the number of comparisons. Eg for small vectors (sized 1 to 50) you should binary search each element to avoid traversing all the 9000+ elements of the subject vector.
Improving code quality to reduce branch mispredictions, eg observing that the resulting set will usually be of smaller size than the input sets could transform such code :
while (Apos < Aend && Bpos < Bend) {
if (A[Apos] == B[Bpos]) {
C[Cpos++] = A[Apos];
Apos++; Bpos++;
}
else if (A[Apos] > B[Bpos]) {
Bpos++;
}
else {
Apos++;
}
}
to code that "unrolls" such comparisons creating though easier to predict branches (example for block size = 2) :
while (1) {
Adat0 = A[Apos]; Adat1 = A[Apos + 1];
Bdat0 = B[Bpos]; Bdat1 = B[Bpos + 1];
if (Adat0 == Bdat0) {
C[Cpos++] = Adat0;
}
else if (Adat0 == Bdat1) {
C[Cpos++] = Adat0;
goto advanceB;
}
else if (Adat1 == Bdat0) {
C[Cpos++] = Adat1;
goto advanceA;
}
if (Adat1 == Bdat1) {
C[Cpos++] = Adat1;
goto advanceAB;
}
else if (Adat1 > Bdat1) goto advanceB;
else goto advanceA;
advanceA:
Apos+=2;
if (Apos >= Aend) { break; } else { continue; }
advanceB:
Bpos+=2;
if (Bpos >= Bend) { break; } else { continue; }
advanceAB:
Apos+=2; Bpos+=2;
if (Apos >= Aend || Bpos >= Bend) { break; }
}
// fall back to naive algorithm for remaining elements
Using SIMD instructions to perform block operations
These techniques are hard to describe in a QA context but you can read about them (plus relevant optimizations like the if conversion
) here and here or find implementation elements here
This IMHO is a better way for your case because you have a single subject vector of size 9000+ elements. You could make an interval tree out of it or simply find a way to index it, eg make a structure that will speed up the search against it :
vector<unsigned> subject(9000+);
vector<range> index(9000/M);
where range is a struct like
struct range {
unsigned min, max;
};
thus creating a sequence of ranges like so
[0, 100], [101, 857], ... [33221, 33500]
that will allow to skip many many comparisons when doing the intersection (for example if an element of the other set is larger than the max of a subrange, you can skip that subrange completely)
Yep, there's always a third element in a list of two :P. When you have optimized enough the procedure (and only then), do break up your work into chunks and run it in parallel. The problem fits an embarassing pattern so 200 vectors vs 1 should definetely run as "50 vs 1 four times concurrently"
Test, measure, redesign!!
If I understood correctly your code then if N is the number of vectors and M the number of elements inside each vector then your algorithm is roughly O(N * M^2). Then there is the 'bucket' strategy that improves things a bit but its effect is difficult to evaluate at a first sight.
I would suggest you to work on sorted vectors and make intersections on sorted ones. Something like this:
vector<vector<unsigned> > vecOfVec;
vector<unsignend> vecSearched ;
for (vector<unsigned> v : vecSearched) // yes, a copy
{
std::sort(v.begin(), v.end()) ;
if (vecSearched.empty()) // first run detection
vSearched = v ;
else
{ // compute intersection of v and vecSearch
auto vit = v.begin() ;
auto vend = v.end() ;
auto sit = vecSearched.begin() ;
auto send = vecSearched.end() ;
vector<unsiged> result ;
while (vit != vend && sit != send)
{
if (*vit < *sit)
vit++ ;
else if (*vit == *sit)
{
result.push_bck(*it) ;
++vit ;
++sit ;
}
else // *vit > *sit
++sit ;
}
vecSearched = result ;
}
}
Code is untested, anyway the idea behind it is that intersection on sorted vectors is easier since you can compare those two iterators (vit, sit) and grow the one pointing to the smaller one. So intersecton is linear in M and whole complexity is O(N * M *log(M)), where log(M) is due to sorting
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With