Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Example of dynamic thread pool in boost::asio

I'm going to implement boost::asio server with a thread pool using single io_service ( HTTP Server 3 example ). io_service will be bound to unix domain socket and pass requests going from connections on this socket to different threads. In order to reduce resource consumption I want to make the thread pool dynamic.

Here is a concept. Firstly a single thread is created. When a request arrives and server sees that there is no idle thread in a pool it creates a new thread and passes the request to it. The server can create up to some maximum number of threads. Ideally it should have functinality of suspending threads which are idle for some time.

Did somebody make something similar? Or maybe somebody has a relevant example?

As for me, I guess I should somehow override io_service.dispatch to achieve that.

like image 201
boqapt Avatar asked Jun 20 '12 13:06

boqapt


People also ask

Is boost asio multithreaded?

If the run() method is called on an object of type boost::asio::io_service, the associated handlers are invoked on the same thread. By using multiple threads, an application can call multiple run() methods simultaneously.

What is thread pool in java?

What is ThreadPool in Java? A thread pool reuses previously created threads to execute current tasks and offers a solution to the problem of thread cycle overhead and resource thrashing.

When using thread pool what happens?

Once a thread in the thread pool completes its task, it's returned to a queue of waiting threads. From this moment it can be reused. This reuse enables applications to avoid the cost of creating a new thread for each task. There is only one thread pool per process.

What is a pool in multithreading?

A thread pool manages a set of anonymous threads that perform work on request. The threads do not terminate right away. When one of the threads completes a task, the thread becomes idle, ready to be dispatched to another task.


1 Answers

There may be a few challenges with the initial approach:

  • boost::asio::io_service is not intended to be derived from or reimplemented. Note the lack of virtual functions.
  • If your thread library does not provide the ability to query a thread's state, then state information needs to be managed separately.

An alternative solution is to post a job into the io_service, then check how long it sat in the io_service. If the time delta between when it was ready-to-run and when it was actually ran is above a certain threshold, then this indicates there are more jobs in the queue than threads servicing the queue. A major benefit to this is that the dynamic thread pool growth logic becomes decoupled from other logic.

Here is an example that accomplishes this by using the deadline_timer.

  • Set deadline_timer to expire 3 seconds from now.
  • Asynchronously wait on the deadline_timer. The handler will be ready-to-run 3 seconds from when the deadline_timer was set.
  • In the asynchronous handler, check the current time relative to when the timer was set to expire. If it is greater than 2 seconds, then the io_service queue is backing up, so add a thread to the thread pool.

Example:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>

class thread_pool_checker
  : private boost::noncopyable
{
public:

  thread_pool_checker( boost::asio::io_service& io_service,
                       boost::thread_group& threads,
                       unsigned int max_threads,
                       long threshold_seconds,
                       long periodic_seconds )
    : io_service_( io_service ),
      timer_( io_service ),
      threads_( threads ),
      max_threads_( max_threads ),
      threshold_seconds_( threshold_seconds ),
      periodic_seconds_( periodic_seconds )
    {
      schedule_check();
    }

private:

  void schedule_check();
  void on_check( const boost::system::error_code& error );

private:

  boost::asio::io_service&    io_service_;
  boost::asio::deadline_timer timer_;
  boost::thread_group&        threads_;
  unsigned int                max_threads_;
  long                        threshold_seconds_;
  long                        periodic_seconds_;
};

void thread_pool_checker::schedule_check()
{
  // Thread pool is already at max size.
  if ( max_threads_ <= threads_.size() )
  {
    std::cout << "Thread pool has reached its max.  Example will shutdown."
              << std::endl;
    io_service_.stop();
    return;
  }

  // Schedule check to see if pool needs to increase.
  std::cout << "Will check if pool needs to increase in " 
            << periodic_seconds_ << " seconds." << std::endl;
  timer_.expires_from_now( boost::posix_time::seconds( periodic_seconds_ ) );
  timer_.async_wait( 
    boost::bind( &thread_pool_checker::on_check, this,
                 boost::asio::placeholders::error ) );
}

void thread_pool_checker::on_check( const boost::system::error_code& error )
{
  // On error, return early.
  if ( error ) return;

  // Check how long this job was waiting in the service queue.  This
  // returns the expiration time relative to now.  Thus, if it expired
  // 7 seconds ago, then the delta time is -7 seconds.
  boost::posix_time::time_duration delta = timer_.expires_from_now();
  long wait_in_seconds = -delta.seconds();

  // If the time delta is greater than the threshold, then the job
  // remained in the service queue for too long, so increase the
  // thread pool.
  std::cout << "Job job sat in queue for " 
            << wait_in_seconds << " seconds." << std::endl;
  if ( threshold_seconds_ < wait_in_seconds )
  {
    std::cout << "Increasing thread pool." << std::endl;
    threads_.create_thread(
      boost::bind( &boost::asio::io_service::run,
                   &io_service_ ) );
  }

  // Otherwise, schedule another pool check.
  schedule_check();
}

// Busy work functions.
void busy_work( boost::asio::io_service&,
                unsigned int );

void add_busy_work( boost::asio::io_service& io_service,
                    unsigned int count )
{
  io_service.post(
    boost::bind( busy_work,
                 boost::ref( io_service ),
                 count ) );
}

void busy_work( boost::asio::io_service& io_service,
                unsigned int count )
{
  boost::this_thread::sleep( boost::posix_time::seconds( 5 ) );

  count += 1;

  // When the count is 3, spawn additional busy work.
  if ( 3 == count )
  {
    add_busy_work( io_service, 0 );
  }
  add_busy_work( io_service, count );
}

int main()
{
  using boost::asio::ip::tcp;

  // Create io service.
  boost::asio::io_service io_service;

  // Add some busy work to the service.
  add_busy_work( io_service, 0 );

  // Create thread group and thread_pool_checker.
  boost::thread_group threads;
  thread_pool_checker checker( io_service, threads,
                               3,   // Max pool size.
                               2,   // Create thread if job waits for 2 sec.
                               3 ); // Check if pool needs to grow every 3 sec.

  // Start running the io service.
  io_service.run();

  threads.join_all();

  return 0;
}

Output:

Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 7 seconds.
Increasing thread pool.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 4 seconds.
Increasing thread pool.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 0 seconds.
Will check if pool needs to increase in 3 seconds.
Job job sat in queue for 3 seconds.
Increasing thread pool.
Thread pool has reached its max.  Example will shutdown.
like image 191
Tanner Sansbury Avatar answered Oct 19 '22 07:10

Tanner Sansbury