Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread pooling in C++11

Relevant questions:

About C++11:

  • C++11: std::thread pooled?
  • Will async(launch::async) in C++11 make thread pools obsolete for avoiding expensive thread creation?

About Boost:

  • C++ boost thread reusing threads
  • boost::thread and creating a pool of them!

How do I get a pool of threads to send tasks to, without creating and deleting them over and over again? This means persistent threads to resynchronize without joining.


I have code that looks like this:

namespace {   std::vector<std::thread> workers;    int total = 4;   int arr[4] = {0};    void each_thread_does(int i) {     arr[i] += 2;   } }  int main(int argc, char *argv[]) {   for (int i = 0; i < 8; ++i) { // for 8 iterations,     for (int j = 0; j < 4; ++j) {       workers.push_back(std::thread(each_thread_does, j));     }     for (std::thread &t: workers) {       if (t.joinable()) {         t.join();       }     }     arr[4] = std::min_element(arr, arr+4);   }   return 0; } 

Instead of creating and joining threads each iteration, I'd prefer to send tasks to my worker threads each iteration and only create them once.

like image 692
Yktula Avatar asked Apr 01 '13 21:04

Yktula


1 Answers

This is copied from my answer to another very similar post:

  1. Start with maximum number of threads the system can support:

    int num_threads = std::thread::hardware_concurrency(); 
  2. For an efficient threadpool implementation, once threads are created according to num_threads, it's better not to create new ones, or destroy old ones (by joining). There will be a performance penalty, an it might even make your application go slower than the serial version.

Each C++11 thread should be running in their function with an infinite loop, constantly waiting for new tasks to grab and run.

Here is how to attach such a function to the thread pool:

int num_threads = std::thread::hardware_concurrency(); std::vector<std::thread> threads; for (int i = 0; i < num_threads; i++) {     pool.push_back(std::thread(Infinite_loop_function)); } 
  1. The infinite loop function. This is a while (true) loop waiting for the task queue.
void Pool::Infinite_loop_function() {     while (true)     {         {             std::unique_lock<std::mutex> lock(queue_mutex);              condition.wait(lock, [this](){                 return !queue.empty() || terminate_pool;             });             Job = queue.front();             queue.pop();         }          Job(); // function<void()> type     } }; 
  1. Make a function to add job to your queue
void Pool::Add_Job(function<void()> New_Job) {     {         std::unique_lock<std::mutex> lock(queue_mutex);         queue.push(New_Job);     }      condition.notify_one(); } 
  1. Bind an arbitrary function to your queue
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object)); 

Once you integrate these ingredients, you have your own dynamic threading pool. These threads always run, waiting for job to do.

I apologize if there are some syntax errors, I typed this code and and I have a bad memory. Sorry that I cannot provide you the complete thread pool code; that would violate my job integrity.

Edit: to terminate the pool, call the shutdown() method:

Pool::shutdown() {     {         std::unique_lock<std::mutex> lock(threadpool_mutex);         terminate_pool = true; // use this flag in condition.wait     }      condition.notify_all(); // wake up all threads.      // Join all threads.     for (std::thread &th : threads)     {         th.join();     }      pool.clear();       stopped = true; // use this flag in destructor, if not set, call shutdown()  } 

Note: the anonymous code blocks are used so that when they are exited, the std::unique_lock variables created within them go out of scope, unlocking the mutex.

like image 192
PhD AP EcE Avatar answered Sep 18 '22 07:09

PhD AP EcE