Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel for_each more than two times slower than std::for_each

I'm reading C++ Concurrency in Action by Anthony Williams. In the chapter about designing concurrent code there is parallel version of std::for_each algorihtm. Here is slightly modified code from the book:

join_thread.hpp

#pragma once

#include <vector>
#include <thread>

class join_threads
{
public:
  explicit join_threads(std::vector<std::thread>& threads)
    : threads_(threads) {}

  ~join_threads()
  {
    for (size_t i = 0; i < threads_.size(); ++i)
    {
      if(threads_[i].joinable())
      {
        threads_[i].join();
      }
    }
  }

private:
  std::vector<std::thread>& threads_;
};

parallel_for_each.hpp

#pragma once

#include <future>
#include <algorithm>

#include "join_threads.hpp"

template<typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func func)
{
  const auto length = std::distance(first, last);
  if (0 == length) return;

  const auto min_per_thread = 25u;
  const unsigned max_threads = (length + min_per_thread - 1) / min_per_thread;

  const auto hardware_threads = std::thread::hardware_concurrency();

  const auto num_threads= std::min(hardware_threads != 0 ?
        hardware_threads : 2u, max_threads);

  const auto block_size = length / num_threads;

  std::vector<std::future<void>> futures(num_threads - 1);
  std::vector<std::thread> threads(num_threads-1);
  join_threads joiner(threads);

  auto block_start = first;
  for (unsigned i = 0; i < num_threads - 1; ++i)
  {
    auto block_end = block_start;
    std::advance(block_end, block_size);
    std::packaged_task<void (void)> task([block_start, block_end, func]()
    {
      std::for_each(block_start, block_end, func);
    });
    futures[i] = task.get_future();
    threads[i] = std::thread(std::move(task));
    block_start = block_end;
  }

  std::for_each(block_start, last, func);

  for (size_t i = 0; i < num_threads - 1; ++i)
  {
    futures[i].get();
  }
}

I benchmarked it with sequential version of std::for_each using the following program:

main.cpp

#include <iostream>
#include <random>
#include <chrono>

#include "parallel_for_each.hpp"

using namespace std;

constexpr size_t ARRAY_SIZE = 500'000'000;
typedef std::vector<uint64_t> Array;

template <class FE, class F>
void test_for_each(const Array& a, FE fe, F f, atomic<uint64_t>& result)
{
  auto time_begin = chrono::high_resolution_clock::now();
  result = 0;
  fe(a.begin(), a.end(), f);
  auto time_end = chrono::high_resolution_clock::now();

  cout << "Result = " << result << endl;
  cout << "Time: " << chrono::duration_cast<chrono::milliseconds>(
            time_end - time_begin).count() << endl;
}

int main()
{
  random_device device;
  default_random_engine engine(device());
  uniform_int_distribution<uint8_t> distribution(0, 255);

  Array a;
  a.reserve(ARRAY_SIZE);

  cout << "Generating array ... " << endl;
  for (size_t i = 0; i < ARRAY_SIZE; ++i)
    a.push_back(distribution(engine));

  atomic<uint64_t> result;
  auto acc = [&result](uint64_t value) { result += value; };

  cout << "parallel_for_each ..." << endl;
  test_for_each(a, parallel_for_each<Array::const_iterator, decltype(acc)>, acc, result);
  cout << "for_each ..." << endl;
  test_for_each(a, for_each<Array::const_iterator, decltype(acc)>, acc, result);

  return 0;
}

The parallel version of the algorithm on my machine is more than two times slower than sequential one:

parallel_for_each ...
Result = 63750301073
Time: 5448
for_each ...
Result = 63750301073
Time: 2496

I'm using GCC 6.2 compiler on Ubuntu Linux running on Intel(R) Core(TM) i3-6100 CPU @ 3.70GHz.

How such a behavior can be explained? Is this because of sharing of atomic<uint64_t> variable between threads and cache ping-pong?

I profiled both separately with perf. For the parallel version the stats are the following:

 1137982167      cache-references                                            
  247652893      cache-misses              #   21,762 % of all cache refs    
60868183996      cycles                                                      
27409239189      instructions              #    0,45  insns per cycle        
 3287117194      branches                                                    
      80895      faults                                                      
          4      migrations

And for the sequential one:

  402791485      cache-references                                            
  246561299      cache-misses              #   61,213 % of all cache refs    
40284812779      cycles                                                      
26515783790      instructions              #    0,66  insns per cycle
 3188784664      branches                                                    
      48179      faults
          3      migrations

It is obvious that the parallel version generates far more cache references, cycles and faults but why?

like image 302
bobeff Avatar asked Nov 25 '16 12:11

bobeff


1 Answers

You are sharing the same result variable: all the threads are accumulating on atomic<uint64_t> result, thrashing the cache!

Every time a thread writes to result, all the caches in the other cores are invalidated: this leads to cache line contention.

More information:

  • "Sharing Is the Root of All Contention".

    [...] to write to a memory location a core must additionally have exclusive ownership of the cache line containing that location. While one core has exclusive use, all other cores trying to write the same memory location must wait and take turns — that is, they must run serially. Conceptually, it's as if each cache line were protected by a hardware mutex, where only one core can hold the hardware lock on that cache line at a time.

  • This article on "false sharing", which covers a similar issue, explains more in depth what happens in the caches.


I made some modifications to your program and achieved the following results (on a machine with an i7-4770K [8 threads + hyperthreading]):

Generating array ...
parallel_for_each ...
Result = 63748111806
Time: 195
for_each ...
Result = 63748111806
Time: 2727

The parallel version is roughly 92% faster than the serial version.


  1. std::future and std::packaged_task are heavyweight abstractions. In this case, an std::experimental::latch is sufficient.

  2. Every task is sent to a thread pool This minimizes thread creation overhead.

  3. Every task has its own accumulator. This eliminates sharing.

The code is available here on my GitHub. It uses some personal dependencies, but you should understand the changes regardless.


Here are the most important changes:

// A latch is being used instead of a vector of futures.
ecst::latch l(num_threads - 1);

l.execute_and_wait_until_zero([&]
{
    auto block_start = first;
    for (unsigned i = 0; i < num_threads - 1; ++i)
    {
        auto block_end = block_start;
        std::advance(block_end, block_size);

        // `p` is a thread pool.
        // Every task posted in the thread pool has its own `tempacc` accumulator.
        p.post([&, block_start, block_end, tempacc = 0ull]() mutable
        {
            // The task accumulator is filled up...
            std::for_each(block_start, block_end, [&tempacc](auto x){ tempacc += x; });

            // ...and then the atomic variable is incremented ONCE.
            func(tempacc);
            l.decrement_and_notify_all();
        });

        block_start = block_end;
    }

    // Same idea here: accumulate to local non-atomic counter, then
    // add the partial result to the atomic counter ONCE.
    auto tempacc2 = 0ull;
    std::for_each(block_start, last, [&tempacc2](auto x){ tempacc2 += x; });
    func(tempacc2);
});
like image 118
Vittorio Romeo Avatar answered Nov 14 '22 17:11

Vittorio Romeo