I'm going to implement boost::asio server with a thread pool using single io_service
( HTTP Server 3
example ). io_service
will be bound to unix domain socket and pass requests going from connections on this socket to different threads. In order to reduce resource consumption I want to make the thread pool dynamic.
Here is a concept. Firstly a single thread is created. When a request arrives and server sees that there is no idle thread in a pool it creates a new thread and passes the request to it. The server can create up to some maximum number of threads. Ideally it should have functinality of suspending threads which are idle for some time.
Did somebody make something similar? Or maybe somebody has a relevant example?
As for me, I guess I should somehow override io_service.dispatch
to achieve that.
If the run() method is called on an object of type boost::asio::io_service, the associated handlers are invoked on the same thread. By using multiple threads, an application can call multiple run() methods simultaneously.
What is ThreadPool in Java? A thread pool reuses previously created threads to execute current tasks and offers a solution to the problem of thread cycle overhead and resource thrashing.
Once a thread in the thread pool completes its task, it's returned to a queue of waiting threads. From this moment it can be reused. This reuse enables applications to avoid the cost of creating a new thread for each task. There is only one thread pool per process.
A thread pool manages a set of anonymous threads that perform work on request. The threads do not terminate right away. When one of the threads completes a task, the thread becomes idle, ready to be dispatched to another task.
There may be a few challenges with the initial approach:
boost::asio::io_service
is not intended to be derived from or reimplemented. Note the lack of virtual functions.An alternative solution is to post a job into the io_service
, then check how long it sat in the io_service
. If the time delta between when it was ready-to-run and when it was actually ran is above a certain threshold, then this indicates there are more jobs in the queue than threads servicing the queue. A major benefit to this is that the dynamic thread pool growth logic becomes decoupled from other logic.
Here is an example that accomplishes this by using the deadline_timer
.
deadline_timer
to expire 3
seconds from now.deadline_timer
. The handler will be ready-to-run 3
seconds from when the deadline_timer
was set.2
seconds, then the io_service
queue is backing up, so add a thread to the thread pool.Example:
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <iostream>
class thread_pool_checker
: private boost::noncopyable
{
public:
thread_pool_checker( boost::asio::io_service& io_service,
boost::thread_group& threads,
unsigned int max_threads,
long threshold_seconds,
long periodic_seconds )
: io_service_( io_service ),
timer_( io_service ),
threads_( threads ),
max_threads_( max_threads ),
threshold_seconds_( threshold_seconds ),
periodic_seconds_( periodic_seconds )
{
schedule_check();
}
private:
void schedule_check();
void on_check( const boost::system::error_code& error );
private:
boost::asio::io_service& io_service_;
boost::asio::deadline_timer timer_;
boost::thread_group& threads_;
unsigned int max_threads_;
long threshold_seconds_;
long periodic_seconds_;
};
void thread_pool_checker::schedule_check()
{
// Thread pool is already at max size.
if ( max_threads_ <= threads_.size() )
{
std::cout << "Thread pool has reached its max. Example will shutdown."
<< std::endl;
io_service_.stop();
return;
}
// Schedule check to see if pool needs to increase.
std::cout << "Will check if pool needs to increase in "
<< periodic_seconds_ << " seconds." << std::endl;
timer_.expires_from_now( boost::posix_time::seconds( periodic_seconds_ ) );
timer_.async_wait(
boost::bind( &thread_pool_checker::on_check, this,
boost::asio::placeholders::error ) );
}
void thread_pool_checker::on_check( const boost::system::error_code& error )
{
// On error, return early.
if ( error ) return;
// Check how long this job was waiting in the service queue. This
// returns the expiration time relative to now. Thus, if it expired
// 7 seconds ago, then the delta time is -7 seconds.
boost::posix_time::time_duration delta = timer_.expires_from_now();
long wait_in_seconds = -delta.seconds();
// If the time delta is greater than the threshold, then the job
// remained in the service queue for too long, so increase the
// thread pool.
std::cout << "Job job sat in queue for "
<< wait_in_seconds << " seconds." << std::endl;
if ( threshold_seconds_ < wait_in_seconds )
{
std::cout << "Increasing thread pool." << std::endl;
threads_.create_thread(
boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
// Otherwise, schedule another pool check.
schedule_check();
}
// Busy work functions.
void busy_work( boost::asio::io_service&,
unsigned int );
void add_busy_work( boost::asio::io_service& io_service,
unsigned int count )
{
io_service.post(
boost::bind( busy_work,
boost::ref( io_service ),
count ) );
}
void busy_work( boost::asio::io_service& io_service,
unsigned int count )
{
boost::this_thread::sleep( boost::posix_time::seconds( 5 ) );
count += 1;
// When the count is 3, spawn additional busy work.
if ( 3 == count )
{
add_busy_work( io_service, 0 );
}
add_busy_work( io_service, count );
}
int main()
{
using boost::asio::ip::tcp;
// Create io service.
boost::asio::io_service io_service;
// Add some busy work to the service.
add_busy_work( io_service, 0 );
// Create thread group and thread_pool_checker.
boost::thread_group threads;
thread_pool_checker checker( io_service, threads,
3, // Max pool size.
2, // Create thread if job waits for 2 sec.
3 ); // Check if pool needs to grow every 3 sec.
// Start running the io service.
io_service.run();
threads.join_all();
return 0;
}
Output:
Will check if pool needs to increase in 3 seconds. Job job sat in queue for 7 seconds. Increasing thread pool. Will check if pool needs to increase in 3 seconds. Job job sat in queue for 0 seconds. Will check if pool needs to increase in 3 seconds. Job job sat in queue for 4 seconds. Increasing thread pool. Will check if pool needs to increase in 3 seconds. Job job sat in queue for 0 seconds. Will check if pool needs to increase in 3 seconds. Job job sat in queue for 0 seconds. Will check if pool needs to increase in 3 seconds. Job job sat in queue for 0 seconds. Will check if pool needs to increase in 3 seconds. Job job sat in queue for 3 seconds. Increasing thread pool. Thread pool has reached its max. Example will shutdown.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With