I have a fairly simple problem but I cannot figure out an elegant solution to it.
I have a Thrust code which produces c
vectors of same size containing values. Let say each of these c
vectors have an index. I would like for each vector position to get the index of the c
vector for which the value is the lowest:
Example:
C0 = (0,10,20,3,40)
C1 = (1,2 ,3 ,5,10)
I would get as result a vector containing the index of the C
vector which has the lowest value:
result = (0,1 ,1 ,0,1)
I have thought about doing it using thrust zip iterators, but have come accross issues: I could zip all the c
vectors and implement an arbitrary transformation which takes a tuple and returns the index of its lowest value, but:
10
elements and there can be much more than 10
c
vectors.I have then thought about doing it this way: Instead of having c
separate vectors, append them all in a single vector C
, then generate keys referencing the positions and perform a stable sort by key which will regroup the vector entries from a same position together. In the example that would give:
C = (0,10,20,3,40,1,2,3,5,10)
keys = (0,1 ,2 ,3,4 ,0,1,2,3,4 )
after stable sort by key:
output = (0,1,10,2,20,3,3,5,40,10)
keys = (0,0,1 ,1,2 ,2,3,3,4 ,4 )
Then generate keys with the positions in the vector, zip the output with the index of the c
vectors and then perform a reduce by key with a custom functor which for each reduction outputs the index with the lowest value. In the example:
input = (0,1,10,2,20,3,3,5,40,10)
indexes= (0,1,0 ,1,0 ,1,0,1,0 ,1)
keys = (0,0,1 ,1,2 ,2,3,3,4 ,4)
after reduce by keys on zipped input and indexes:
output = (0,1,1,0,1)
However, how to write such functor for the reduce by key operation?
Since the length of your vectors has to be the same. It's better to concatenate them together and treat them as a matrix C.
Then your problem becomes finding the indices of the min element of each column in a row-major matrix. It can be solved as follows.
In step 1, you proposed to use stable_sort_by_key
to rearrange the element order, which is not a effective method. Since the rearrangement can be directly calculated given the #row and #col of the matrix. In thrust, it can be done with permutation iterators as:
thrust::make_permutation_iterator(
c.begin(),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0),
(_1 % row) * col + _1 / row)
)
In step 2, reduce_by_key
can do exactly what you want. In your case the reduction binary-op functor is easy, since comparison on tuple (element of your zipped vector) has already been defined to compare the 1st element of the tuple, and it's supported by thrust as
thrust::minimum< thrust::tuple<float, int> >()
The whole program is shown as follows. Thrust 1.6.0+ is required since I use placeholders in fancy iterators.
#include <iterator>
#include <algorithm>
#include <thrust/device_vector.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/iterator/permutation_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/iterator/discard_iterator.h>
#include <thrust/reduce.h>
#include <thrust/functional.h>
using namespace thrust::placeholders;
int main()
{
const int row = 2;
const int col = 5;
float initc[] =
{ 0, 10, 20, 3, 40, 1, 2, 3, 5, 10 };
thrust::device_vector<float> c(initc, initc + row * col);
thrust::device_vector<float> minval(col);
thrust::device_vector<int> minidx(col);
thrust::reduce_by_key(
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0),
_1 / row),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0),
_1 / row) + row * col,
thrust::make_zip_iterator(
thrust::make_tuple(
thrust::make_permutation_iterator(
c.begin(),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0), (_1 % row) * col + _1 / row)),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0), _1 % row))),
thrust::make_discard_iterator(),
thrust::make_zip_iterator(
thrust::make_tuple(
minval.begin(),
minidx.begin())),
thrust::equal_to<int>(),
thrust::minimum<thrust::tuple<float, int> >()
);
std::copy(minidx.begin(), minidx.end(), std::ostream_iterator<int>(std::cout, " "));
std::cout << std::endl;
return 0;
}
Two remaining issues may affect the performance.
reduce_by_key
is designed for segments with variant lengths, it may not be the fastest algorithm for reduction on segments with same length. Writing your own kernel could be the best solution for highest performance.
One possible idea, building on the vectorized sort idea here
Suppose I have vectors like this:
values: C = ( 0,10,20, 3,40, 1, 2, 3, 5,10)
keys: K = ( 0, 1, 2, 3, 4, 0, 1, 2, 3, 4)
segments: S = ( 0, 0, 0, 0, 0, 1, 1, 1, 1, 1)
zip together K and S to create KS
stable_sort_by_key using C as the keys, and KS as the values:
stable_sort_by_key(C.begin(), C.end(), KS_begin);
zip together the reordered C and K vectors, to create CK
stable_sort_by_key using the reordered S as the keys, and CK as the values:
stable_sort_by_key(S.begin(), S.end(), CK_begin);
use a permutation iterator or a strided range iterator to access every Nth element (0, N, 2N, ...) of the newly re-ordered K vector, to retrieve a vector of the indices of the min element in each segment, where N is the length of the segments.
I haven't actually implemented this, right now it's just an idea. Maybe it won't work for some reason I haven't observed yet.
segments
(S
) and keys
(K
) are effectively row and column indices.
And your question seems wierd to me, because your title mentions "find index of max value" but most of your question seems to be referring to "lowest value". Regardless, with a change to step 6 of my algorithm, you can find either value.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With