Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to optimize an indirect radix sort? (a.k.a. how to optimize unpredictable memory access patterns)

I've written an indirect radix sort algorithm in C++ (by indirect, I mean it returns the indices of the items):

#include <algorithm>
#include <iterator>
#include <vector>

template<class It1, class It2>
void radix_ipass(
    It1 begin, It1 const end,
    It2 const a, size_t const i,
    std::vector<std::vector<size_t> > &buckets)
{
    size_t ncleared = 0;
    for (It1 j = begin; j != end; ++j)
    {
        size_t const k = a[*j][i];
        while (k >= ncleared && ncleared < buckets.size())
        { buckets[ncleared++].clear(); }
        if (k >= buckets.size())
        {
            buckets.resize(k + 1);
            ncleared = buckets.size();
        }
        buckets[k].push_back(size_t());
        using std::swap; swap(buckets[k].back(), *j);
    }
    for (std::vector<std::vector<size_t> >::iterator
        j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
    {
        begin = std::swap_ranges(j->begin(), j->end(), begin);
    }
}

template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)
{
    for (ptrdiff_t i = 0; i != end - begin; ++i) { items[i] = i; }
    size_t smax = 0;
    for (It i = begin; i != end; ++i)
    {
        size_t const n = i->size();
        smax = n > smax ? n : smax;
    }
    std::vector<std::vector<size_t> > buckets;
    for (size_t i = 0; i != smax; ++i)
    {
        radix_ipass(
            items, items + (end - begin),
            begin, smax - i - 1, buckets);
    }
}

It seems to perform around 40% faster than std::sort when I test it with the following code (3920 ms compared to 6530 ms):

#include <functional>

template<class Key>
struct key_comp : public Key
{
    explicit key_comp(Key const &key = Key()) : Key(key) { }
    template<class T>
    bool operator()(T const &a, T const &b) const
    { return this->Key::operator()(a) < this->Key::operator()(b); }
};

template<class Key>
key_comp<Key> make_key_comp(Key const &key) { return key_comp<Key>(key); }

template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
{ T1 operator()(T1 a, T2 const &b) const { return a += b; } };

template<class F>
struct deref : public F
{
    deref(F const &f) : F(f) { }
    typename std::iterator_traits<
        typename F::result_type
    >::value_type const
        &operator()(typename F::argument_type const &a) const
    { return *this->F::operator()(a); }
};

template<class T> deref<T> make_deref(T const &t) { return deref<T>(t); }

size_t xorshf96(void)  // random number generator
{
    static size_t x = 123456789, y = 362436069, z = 521288629;
    x ^= x << 16;
    x ^= x >> 5;
    x ^= x << 1;
    size_t t = x;
    x = y;
    y = z;
    z = t ^ x ^ y;
    return z;
}

#include <stdio.h>
#include <time.h>

#include <array>

int main(void)
{
    typedef std::vector<std::array<size_t, 3> > Items;
    Items items(1 << 24);
    std::vector<size_t> ranks(items.size() * 2);
    for (size_t i = 0; i != items.size(); i++)
    {
        ranks[i] = i;
        for (size_t j = 0; j != items[i].size(); j++)
        { items[i][j] = xorshf96() & 0xFFF; }
    }
    clock_t const start = clock();
    if (1) { radix_isort(items.begin(), items.end(), ranks.begin()); }
    else  // STL sorting
    {
        std::sort(
            ranks.begin(),
            ranks.begin() + items.size(),
            make_key_comp(make_deref(std::bind1st(
                add<Items::const_iterator, ptrdiff_t>(),
                items.begin()))));
    }
    printf("%u ms\n",
        (unsigned)((clock() - start) * 1000 / CLOCKS_PER_SEC),
        std::min(ranks.begin(), ranks.end()));
    return 0;
}

Hmm, I guess that's the best I can do, I thought.

But after lots of banging my head against the wall, I realized that prefetching in the beginning of radix_ipass can help cut down the result to 1440 ms (!):

#include <xmmintrin.h>

...

    for (It1 j = begin; j != end; ++j)
    {
#if defined(_MM_TRANSPOSE4_PS)  // should be defined if xmmintrin.h is included
        enum { N = 8 };
        if (end - j > N)
        { _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); }
#endif
        ...
    }

Clearly, the bottleneck is the memory bandwidth---the access pattern is unpredictable.

So now my question is: what else can I do to make it even faster on similar amounts of data?

Or is there not much room left for improvement?

(I'm hoping to avoid compromising the readability of the code if possible, so if the readability is harmed, the improvement should be significant.)

like image 288
user541686 Avatar asked Nov 15 '13 11:11

user541686


People also ask

What is radix sort in data structure?

Algorithms Sorting Algorithm Data Structure Radix sort is a non-comparative sorting algorithm. This sorting algorithm works on the integer keys by grouping digits which share the same position and value. The radix is the base of a number system.

Why is Radix better than quick sort?

If we have log 2n bits for every digit, the running time of Radix appears to be better than Quick Sort for a wide range of input numbers. The constant factors hidden in asymptotic notation are higher for Radix Sort and Quick-Sort uses hardware caches more effectively.

How does this sorting algorithm work on integer keys?

This sorting algorithm works on the integer keys by grouping digits which share the same position and value. The radix is the base of a number system. As we know that in the decimal system the radix or base is 10. So for sorting some decimal numbers, we need 10 positional boxes to store numbers.

What is the lower bound for comparison based sorting algorithm?

The lower bound for Comparison based sorting algorithm (Merge Sort, Heap Sort, Quick-Sort .. etc) is Ω (nLogn), i.e., they cannot do better than nLogn. Counting sort is a linear time sorting algorithm that sort in O (n+k) time when elements are in the range from 1 to k.


1 Answers

Using a more compact data structure that combines ranks and values can boost the performance of std::sort by a factor 2-3. Essentially, the sort now runs on a vector<pair<Value,Rank>>. The Value data type, std::array<integer_type, 3> has been replaced for this by a more compact pair<uint32_t, uint8_t> data structure. Only half a byte of it is unused, and the < comparison can by done in two steps, first using a presumably efficient comparison of uint32_ts (it's not clear if the loop used by std::array<..>::operator< can be optimized to a similarly fast code, but the replacement of std::array<integer_type,3> by this data structure yielded another performance boost).

Still, it doesn't get as efficient as the radix sort. (Maybe you could tweak a custom QuickSort with prefetches?)

Besides that additional sorting method, I've replaced the xorshf96 by a mt19937, because I know how to provide a seed for the latter ;)

The seed and the number of values can be changed via two command-line arguments: first the seed, then the count.

Compiled with g++ 4.9.0 20131022, using -std=c++11 -march=native -O3, for a 64-bit linux

Sample runs; important note running on a Core2Duo processor U9400 (old & slow!)

item count: 16000000
using std::sort
duration: 12260 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort
duration: 12230 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort
duration: 12230 ms
result sorted: true


seed: 5648
item count: 16000000
using std::sort with a packed data structure
duration: 4290 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort with a packed data structure
duration: 4270 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort with a packed data structure
duration: 4280 ms
result sorted: true


item count: 16000000
using radix sort
duration: 3790 ms
result sorted: true

seed: 5648
item count: 16000000
using radix sort
duration: 3820 ms
result sorted: true

seed: 5648
item count: 16000000
using radix sort
duration: 3780 ms
result sorted: true

New or changed code:

template<class It>
struct fun_obj
{
        It beg;
        bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
        {
                return beg[lhs] < beg[rhs];
        }
};

template<class It>
fun_obj<It> make_fun_obj(It beg)
{
        return fun_obj<It>{beg};
}

struct uint32p8_t
{
        uint32_t m32;
        uint8_t m8;

        uint32p8_t(std::array<uint16_t, 3> const& a)
                : m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
                , m8( a[2]&0xFF )
        {
        }

        operator std::array<size_t, 3>() const
        {
                return {{m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
                         (m32&0xF)<<8 | m8}};
        }

        friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
        {
                if(lhs.m32 < rhs.m32) return true;
                if(lhs.m32 > rhs.m32) return false;
                return lhs.m8 < rhs.m8;
        }
};

#include <stdio.h>
#include <time.h>

#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>

int main(int argc, char* argv[])
{
    std::cout.sync_with_stdio(false);

    constexpr auto items_count_default = 2<<22;
    constexpr auto seed_default = 42;

    uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
    std::cout << "seed: " << seed << "\n";

    size_t const items_count = argc > 2 ? std::atoll(argv[2])
                                        : items_count_default;
    std::cout << "item count: " << items_count << "\n";

    using Items_array_value_t =
        #ifdef RADIX_SORT
            size_t
        #elif defined(STDSORT)
            uint16_t
        #elif defined(STDSORT_PACKED)
            uint16_t
        #endif
    ;

    typedef std::vector<std::array<Items_array_value_t, 3> > Items;
    Items items(items_count);

    auto const ranks_count =
        #ifdef RADIX_SORT
            items.size() * 2
        #elif defined(STDSORT)
            items.size()
        #elif defined(STDSORT_PACKED)
            items.size()
    #endif
    ;

    //auto prng = xorshf96;
    std::mt19937 gen(seed);
    std::uniform_int_distribution<> dist;
    auto prng = [&dist, &gen]{return dist(gen);};

    std::vector<size_t> ranks(ranks_count);
    for (size_t i = 0; i != items.size(); i++)
    {
        ranks[i] = i;

        for (size_t j = 0; j != items[i].size(); j++)
        { items[i][j] = prng() & 0xFFF; }
    }

    std::cout << "using ";
    clock_t const start = clock();
    #ifdef RADIX_SORT
        std::cout << "radix sort\n";
        radix_isort(items.begin(), items.end(), ranks.begin());
    #elif defined(STDSORT)
        std::cout << "std::sort\n";
        std::sort(ranks.begin(), ranks.begin() + items.size(),
                  make_fun_obj(items.cbegin())
                  //make_key_comp(make_deref(std::bind1st(
                  //    add<Items::const_iterator, ptrdiff_t>(),
                  //    items.begin())))
                 );
    #elif defined(STDSORT_PACKED)
        std::cout << "std::sort with a packed data structure\n";
        using Items_ranks = std::vector< std::pair<uint32p8_t,
                                         decltype(ranks)::value_type> >;
        Items_ranks items_ranks;

        size_t i = 0;
        for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
        {
            items_ranks.emplace_back(*iI, i);
        }

        std::sort(begin(items_ranks), end(items_ranks),
                  [](Items_ranks::value_type const& lhs,
                     Items_ranks::value_type const& rhs)
                  { return lhs.first < rhs.first; }
                 );
        std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
                       [](Items_ranks::value_type const& e) { return e.second; }
                      );
    #endif
    auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);

    bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
                                       make_fun_obj(items.cbegin()));

    std::cout << "duration: " << duration << " ms\n"
              << "result sorted: " << std::boolalpha << sorted << "\n";

    return 0;
}

Full code:

#include <algorithm>
#include <iterator>
#include <vector>

#include <cstddef>
using std::size_t;
using std::ptrdiff_t;

#include <xmmintrin.h>

template<class It1, class It2>
void radix_ipass(
    It1 begin, It1 const end,
    It2 const a, size_t const i,
    std::vector<std::vector<size_t> > &buckets)
{
    size_t ncleared = 0;
    for (It1 j = begin; j != end; ++j)
    {
        #if defined(_MM_TRANSPOSE4_PS)
            constexpr auto N = 8;
            if(end - j > N)
            { _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); }
        #else
            #error SS intrinsic not found
        #endif

        size_t const k = a[*j][i];
        while (k >= ncleared && ncleared < buckets.size())
        { buckets[ncleared++].clear(); }
        if (k >= buckets.size())
        {
            buckets.resize(k + 1);
            ncleared = buckets.size();
        }
        buckets[k].push_back(size_t());
        using std::swap; swap(buckets[k].back(), *j);
    }
    for (std::vector<std::vector<size_t> >::iterator
        j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
    {
        begin = std::swap_ranges(j->begin(), j->end(), begin);
    }
}

template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)
{
    for (ptrdiff_t i = 0; i != end - begin; ++i) { items[i] = i; }
    size_t smax = 0;
    for (It i = begin; i != end; ++i)
    {
        size_t const n = i->size();
        smax = n > smax ? n : smax;
    }
    std::vector<std::vector<size_t> > buckets;
    for (size_t i = 0; i != smax; ++i)
    {
        radix_ipass(
            items, items + (end - begin),
            begin, smax - i - 1, buckets);
    }
}

#include <functional>

template<class Key>
struct key_comp : public Key
{
    explicit key_comp(Key const &key = Key()) : Key(key) { }
    template<class T>
    bool operator()(T const &a, T const &b) const
    { return this->Key::operator()(a) < this->Key::operator()(b); }
};

template<class Key>
key_comp<Key> make_key_comp(Key const &key) { return key_comp<Key>(key); }

template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
{ T1 operator()(T1 a, T2 const &b) const { return a += b; } };

template<class F>
struct deref : public F
{
    deref(F const &f) : F(f) { }
    typename std::iterator_traits<
        typename F::result_type
    >::value_type const
        &operator()(typename F::argument_type const &a) const
    { return *this->F::operator()(a); }
};

template<class T> deref<T> make_deref(T const &t) { return deref<T>(t); }

size_t xorshf96(void)  // random number generator
{
    static size_t x = 123456789, y = 362436069, z = 521288629;
    x ^= x << 16;
    x ^= x >> 5;
    x ^= x << 1;
    size_t t = x;
    x = y;
    y = z;
    z = t ^ x ^ y;
    return z;
}

template<class It>
struct fun_obj
{
        It beg;
        bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
        {
                return beg[lhs] < beg[rhs];
        }
};

template<class It>
fun_obj<It> make_fun_obj(It beg)
{
        return fun_obj<It>{beg};
}

struct uint32p8_t
{
        uint32_t m32;
        uint8_t m8;

        uint32p8_t(std::array<uint16_t, 3> const& a)
                : m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
                , m8( a[2]&0xFF )
        {
        }

        operator std::array<size_t, 3>() const
        {
                return {{m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
                         (m32&0xF)<<8 | m8}};
        }

        friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
        {
                if(lhs.m32 < rhs.m32) return true;
                if(lhs.m32 > rhs.m32) return false;
                return lhs.m8 < rhs.m8;
        }
};

#include <stdio.h>
#include <time.h>

#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>

int main(int argc, char* argv[])
{
    std::cout.sync_with_stdio(false);

    constexpr auto items_count_default = 2<<22;
    constexpr auto seed_default = 42;

    uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
    std::cout << "seed: " << seed << "\n";
    size_t const items_count = argc > 2 ? std::atoll(argv[2]) : items_count_default;
    std::cout << "item count: " << items_count << "\n";

    using Items_array_value_t =
        #ifdef RADIX_SORT
            size_t
        #elif defined(STDSORT)
            uint16_t
        #elif defined(STDSORT_PACKED)
            uint16_t
        #endif
    ;

    typedef std::vector<std::array<Items_array_value_t, 3> > Items;
    Items items(items_count);

    auto const ranks_count =
        #ifdef RADIX_SORT
            items.size() * 2
        #elif defined(STDSORT)
            items.size()
        #elif defined(STDSORT_PACKED)
            items.size()
    #endif
    ;

    //auto prng = xorshf96;
    std::mt19937 gen(seed);
    std::uniform_int_distribution<> dist;
    auto prng = [&dist, &gen]{return dist(gen);};

    std::vector<size_t> ranks(ranks_count);
    for (size_t i = 0; i != items.size(); i++)
    {
        ranks[i] = i;

        for (size_t j = 0; j != items[i].size(); j++)
        { items[i][j] = prng() & 0xFFF; }
    }

    std::cout << "using ";
    clock_t const start = clock();
    #ifdef RADIX_SORT
        std::cout << "radix sort\n";
        radix_isort(items.begin(), items.end(), ranks.begin());
    #elif defined(STDSORT)
        std::cout << "std::sort\n";
        std::sort(ranks.begin(), ranks.begin() + items.size(),
                  make_fun_obj(items.cbegin())
                  //make_key_comp(make_deref(std::bind1st(
                  //    add<Items::const_iterator, ptrdiff_t>(),
                  //    items.begin())))
                 );
    #elif defined(STDSORT_PACKED)
        std::cout << "std::sort with a packed data structure\n";
        using Items_ranks = std::vector< std::pair<uint32p8_t,
                                         decltype(ranks)::value_type> >;
        Items_ranks items_ranks;

        size_t i = 0;
        for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
        {
            items_ranks.emplace_back(*iI, i);
        }

        std::sort(begin(items_ranks), end(items_ranks),
                  [](Items_ranks::value_type const& lhs,
                     Items_ranks::value_type const& rhs)
                  { return lhs.first < rhs.first; }
                 );
        std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
                       [](Items_ranks::value_type const& e) { return e.second; }
                      );
    #endif
    auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);

    bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
                                       make_fun_obj(items.cbegin()));

    std::cout << "duration: " << duration << " ms\n"
              << "result sorted: " << std::boolalpha << sorted << "\n";

    return 0;
}
like image 188
2 revs Avatar answered Sep 25 '22 23:09

2 revs