I am trying to create a limited thread pool class using boost::asio. But I am stuck at one point can some one help me.
The only problem is the place where I should decrease counter?
code does not work as expected.
the problem is I don't know when my thread will finish execution and how I will come to know that it has return to pool
#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>
using namespace std;
using namespace boost;
class ThreadPool
{
static int count;
int NoOfThread;
thread_group grp;
mutex mutex_;
asio::io_service io_service;
int counter;
stack<thread*> thStk ;
public:
ThreadPool(int num)
{
NoOfThread = num;
counter = 0;
mutex::scoped_lock lock(mutex_);
if(count == 0)
count++;
else
return;
for(int i=0 ; i<num ; ++i)
{
thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
}
}
~ThreadPool()
{
io_service.stop();
grp.join_all();
}
thread* getThread()
{
if(counter > NoOfThread)
{
cout<<"run out of threads \n";
return NULL;
}
counter++;
thread* ptr = thStk.top();
thStk.pop();
return ptr;
}
};
int ThreadPool::count = 0;
struct callable
{
void operator()()
{
cout<<"some task for thread \n";
}
};
int main( int argc, char * argv[] )
{
callable x;
ThreadPool pool(10);
thread* p = pool.getThread();
cout<<p->get_id();
//how i can assign some function to thread pointer ?
//how i can return thread pointer after work done so i can add
//it back to stack?
return 0;
}
If the run() method is called on an object of type boost::asio::io_service, the associated handlers are invoked on the same thread. By using multiple threads, an application can call multiple run() methods simultaneously.
Asio multithreaded environment. The only thing you need to get your completion handlers synchronized properly is io_context::strand class instance. It works pretty simple: completion handlers attached to the same io_context::strand will be invoked serially.
Boost. Asio is a cross-platform C++ library for network and low-level I/O programming that provides developers with a consistent asynchronous model using a modern C++ approach. Overview.
A thread pool manages a set of anonymous threads that perform work on request. The threads do not terminate right away. When one of the threads completes a task, the thread becomes idle, ready to be dispatched to another task.
In short, you need to wrap the user's provided task with another function that will:
I may not be understanding all the requirements for this thread pool. Thus, for clarity, here is an explicit list as to what I believe are the requirements:
Before I provide an implementation, there are a few key points I would like to stress:
io_service::run()
, and callable types are posted into the event queue, such as from io_service::post()
. io_service::run()
returns if there is no work pending in the io_service
, the io_service
is stopped, or an exception is thrown from a handler that the thread was running. To prevent io_serivce::run()
from returning when there is no unfinished work, the io_service::work
class can be used.object()
syntax) instead of requiring a type (i.e. task must inherit from process
), provides more flexibility to the user. It allows the user to supply a task as a function pointer or a type providing a nullary operator()
.Implementation using boost::asio
:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}
/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();
// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}
/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}
private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};
A few comments about the implementation:
boost::thread_interrupted
, then std::terminate()
is called. This is the the result of Boost.Thread's exceptions in thread functions behavior. It is also worth reading Boost.Asio's effect of exceptions thrown from handlers.task
via boost::bind
, then the nested boost::bind
will fail to compile. One of the following options is required:
task
created by boost::bind
.boost::bind
so that boost::protect
could be used, as boost::protect
only functions properly on certain function objects.task
object indirectly. I opted to use boost::function
for readability at the cost of losing the exact type. boost::tuple
, while slightly less readable, could also be used to preserve the exact type, as seen in the Boost.Asio's serialization example.Application code can now use the thread_pool
type non-intrusively:
void work() {};
struct worker
{
void operator()() {};
};
void more_work( int ) {};
int main()
{
thread_pool pool( 2 );
pool.run_task( work ); // Function pointer.
pool.run_task( worker() ); // Callable object.
pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}
The thread_pool
could be created without Boost.Asio, and may be slightly easier for maintainers, as they no longer need to know about Boost.Asio
behaviors, such as when does io_service::run()
return, and what is the io_service::work
object:
#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
std::queue< boost::function< void() > > tasks_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
boost::condition_variable condition_;
bool running_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: available_( pool_size ),
running_( true )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
}
}
/// @brief Destructor.
~thread_pool()
{
// Set running flag to false then notify all threads.
{
boost::unique_lock< boost::mutex > lock( mutex_ );
running_ = false;
condition_.notify_all();
}
try
{
threads_.join_all();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
/// @brief Add task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Set task and signal condition variable so that a worker thread will
// wake up andl use the task.
tasks_.push( boost::function< void() >( task ) );
condition_.notify_one();
}
private:
/// @brief Entry point for pool threads.
void pool_main()
{
while( running_ )
{
// Wait on condition variable while the task is empty and the pool is
// still running.
boost::unique_lock< boost::mutex > lock( mutex_ );
while ( tasks_.empty() && running_ )
{
condition_.wait( lock );
}
// If pool is no longer running, break out.
if ( !running_ ) break;
// Copy task locally and remove from the queue. This is done within
// its own scope so that the task object is destructed immediately
// after running the task. This is useful in the event that the
// function contains shared_ptr arguments bound via bind.
{
boost::function< void() > task = tasks_.front();
tasks_.pop();
lock.unlock();
// Run the task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
// Task has finished, so increment count of available threads.
lock.lock();
++available_;
} // while running_
}
};
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With