Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

boost::asio, thread pools and thread monitoring

I've implemented a thread pool using boost::asio, and some number boost::thread objects calling boost::asio::io_service::run(). However, a requirement that I've been given is to have a way to monitor all threads for "health". My intent is to make a simple sentinel object that can be passed through the thread pool -- if it makes it through, then we can assume that the thread is still processing work.

However, given my implementation, I'm not sure how (if) I can monitor all the threads in the pool reliably. I've simply delegated the thread function to boost::asio::io_service::run(), so posting a sentinel object into the io_service instance won't guarantee which thread will actually get that sentinel and do the work.

One option may be to just periodically insert the sentinel, and hope that it gets picked up by each thread at least once in some reasonable amount of time, but that obviously isn't ideal.

Take the following example. Due to the way that the handler is coded, in this instance we can see that each thread will do the same amount of work, but in reality I will not have control of the handler implementation, some can be long running while others will be almost immediate.

#include <iostream>
#include <boost/asio.hpp>
#include <vector>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

void handler()
{
   std::cout << boost::this_thread::get_id() << "\n";
   boost::this_thread::sleep(boost::posix_time::milliseconds(100));
}

int main(int argc, char **argv)
{
   boost::asio::io_service svc(3);

   std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(svc));

   boost::thread one(boost::bind(&boost::asio::io_service::run, &svc));
   boost::thread two(boost::bind(&boost::asio::io_service::run, &svc));
   boost::thread three(boost::bind(&boost::asio::io_service::run, &svc));

   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);
   svc.post(handler);

   work.reset();

   three.join();
   two.join();
   one.join();

   return 0;
}
like image 925
Chad Avatar asked Aug 28 '12 19:08

Chad


2 Answers

You can use a common io_service instance between all the threads and a private io_service instance for every thread. Every thread will execute a method like this:

void Mythread::threadLoop()
{
    while(/* termination condition */)
    {
        commonIoService.run_one();
        privateIoService.run_one();

        commonConditionVariable.timed_wait(time);
    }
}

By this way, if you want to ensure that some task is executed in a thread, you only have to post this task in its owned io_service.

To post a task in your thread pool you can do:

void MyThreadPool::post(Hander handler)
{
    commonIoService.post(handler);
    commonConditionVariable.notify_all();
}
like image 75
Andrés Senac Avatar answered Oct 01 '22 10:10

Andrés Senac


The solution that I used relies on the fact that I own the implementation of the tread pool objects. I created a wrapper type that will update statistics, and copy the user defined handlers that are posted to the thread pool. Only this wrapper type is ever posted to the underlying io_service. This method allows me to keep track of the handlers that are posted/executed, without having to be intrusive into the user code.

Here's a stripped down and simplified example:

#include <iostream>
#include <memory>
#include <vector>
#include <boost/thread.hpp>
#include <boost/asio.hpp>

// Supports scheduling anonymous jobs that are
// executable as returning nothing and taking
// no arguments
typedef std::function<void(void)> functor_type;

// some way to store per-thread statistics
typedef std::map<boost::thread::id, int> thread_jobcount_map;

// only this type is actually posted to
// the asio proactor, this delegates to
// the user functor in operator()
struct handler_wrapper
{
   handler_wrapper(const functor_type& user_functor, thread_jobcount_map& statistics)
      : user_functor_(user_functor)
      , statistics_(statistics)
   {
   }

   void operator()()
   {
      user_functor_();

      // just for illustration purposes, assume a long running job
      boost::this_thread::sleep(boost::posix_time::milliseconds(100));

      // increment executed jobs
      ++statistics_[boost::this_thread::get_id()];
   }

   functor_type         user_functor_;
   thread_jobcount_map& statistics_;
};

// anonymous thread function, just runs the proactor
void thread_func(boost::asio::io_service& proactor)
{
   proactor.run();
}

class ThreadPool
{
public:
   ThreadPool(size_t thread_count)
   {
      threads_.reserve(thread_count);

      work_.reset(new boost::asio::io_service::work(proactor_));

      for(size_t curr = 0; curr < thread_count; ++curr)
      {
         boost::thread th(thread_func, boost::ref(proactor_));

         // inserting into this map before any work can be scheduled
         // on it, means that we don't have to look it for lookups
         // since we don't dynamically add threads
         thread_jobcount_.insert(std::make_pair(th.get_id(), 0));

         threads_.emplace_back(std::move(th));
      }
   }

   // the only way for a user to get work into 
   // the pool is to use this function, which ensures
   // that the handler_wrapper type is used
   void schedule(const functor_type& user_functor)
   {
      handler_wrapper to_execute(user_functor, thread_jobcount_);
      proactor_.post(to_execute);
   }

   void join()
   {
      // join all threads in pool:
      work_.reset();
      proactor_.stop();

      std::for_each(
         threads_.begin(),
         threads_.end(),
         [] (boost::thread& t)
      {
         t.join();
      });
   }

   // just an example showing statistics
   void log()
   {
      std::for_each(
         thread_jobcount_.begin(),
         thread_jobcount_.end(),
         [] (const thread_jobcount_map::value_type& it)
      {
         std::cout << "Thread: " << it.first << " executed " << it.second << " jobs\n";
      });
   }

private:
   std::vector<boost::thread> threads_;
   std::unique_ptr<boost::asio::io_service::work> work_;
   boost::asio::io_service    proactor_;
   thread_jobcount_map        thread_jobcount_;
};

struct add
{
   add(int lhs, int rhs, int* result)
      : lhs_(lhs)
      , rhs_(rhs)
      , result_(result)
   {
   }

   void operator()()
   {
      *result_ = lhs_ + rhs_;
   }

   int lhs_,rhs_;
   int* result_;
};

int main(int argc, char **argv)
{
   // some "state objects" that are 
   // manipulated by the user functors
   int x = 0, y = 0, z = 0;

   // pool of three threads
   ThreadPool pool(3);

   // schedule some handlers to do some work
   pool.schedule(add(5, 4, &x));
   pool.schedule(add(2, 2, &y));
   pool.schedule(add(7, 8, &z));

   // give all the handlers time to execute
   boost::this_thread::sleep(boost::posix_time::milliseconds(1000));

   std::cout
      << "x = " << x << "\n"
      << "y = " << y << "\n"
      << "z = " << z << "\n";

   pool.join();

   pool.log();
}

Output:

x = 9
y = 4
z = 15
Thread: 0000000000B25430 executed 1 jobs
Thread: 0000000000B274F0 executed 1 jobs
Thread: 0000000000B27990 executed 1 jobs
like image 21
Chad Avatar answered Oct 01 '22 09:10

Chad