Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread pool using boost asio

I am trying to create a limited thread pool class using boost::asio. But I am stuck at one point can some one help me.

The only problem is the place where I should decrease counter?

code does not work as expected.

the problem is I don't know when my thread will finish execution and how I will come to know that it has return to pool

#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>

using namespace std;
using namespace boost;

class ThreadPool
{
    static int count;
    int NoOfThread;
    thread_group grp;
    mutex mutex_;
    asio::io_service io_service;
    int counter;
    stack<thread*> thStk ;

public:
    ThreadPool(int num)
    {   
        NoOfThread = num;
        counter = 0;
        mutex::scoped_lock lock(mutex_);

        if(count == 0)
            count++;
        else
            return;

        for(int i=0 ; i<num ; ++i)
        {
            thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
        }
    }
    ~ThreadPool()
    {
        io_service.stop();
        grp.join_all();
    }

    thread* getThread()
    {
        if(counter > NoOfThread)
        {
            cout<<"run out of threads \n";
            return NULL;
        }

        counter++;
        thread* ptr = thStk.top();
        thStk.pop();
        return ptr;
    }
};
int ThreadPool::count = 0;


struct callable
{
    void operator()()
    {
        cout<<"some task for thread \n";
    }
};

int main( int argc, char * argv[] )
{

    callable x;
    ThreadPool pool(10);
    thread* p = pool.getThread();
    cout<<p->get_id();

    //how i can assign some function to thread pointer ?
    //how i can return thread pointer after work done so i can add 
//it back to stack?


    return 0;
}
like image 455
vivek Avatar asked Aug 31 '12 12:08

vivek


People also ask

Does boost ASIO use threads?

If the run() method is called on an object of type boost::asio::io_service, the associated handlers are invoked on the same thread. By using multiple threads, an application can call multiple run() methods simultaneously.

Is ASIO multithreaded?

Asio multithreaded environment. The only thing you need to get your completion handlers synchronized properly is io_context::strand class instance. It works pretty simple: completion handlers attached to the same io_context::strand will be invoked serially.

What is ASIO boost?

Boost. Asio is a cross-platform C++ library for network and low-level I/O programming that provides developers with a consistent asynchronous model using a modern C++ approach. Overview.

What is a pool in multithreading?

A thread pool manages a set of anonymous threads that perform work on request. The threads do not terminate right away. When one of the threads completes a task, the thread becomes idle, ready to be dispatched to another task.


1 Answers

In short, you need to wrap the user's provided task with another function that will:

  • Invoke the user function or callable object.
  • Lock the mutex and decrement the counter.

I may not be understanding all the requirements for this thread pool. Thus, for clarity, here is an explicit list as to what I believe are the requirements:

  • The pool manages the lifetime of the threads. The user should not be able to delete threads that reside within the pool.
  • The user can assign a task to the pool in a non-intrusive way.
  • When a task is being assigned, if all threads in the pool are currently running other tasks, then the task is discarded.

Before I provide an implementation, there are a few key points I would like to stress:

  • Once a thread has been launched, it will run until completion, cancellation, or termination. The function the thread is executing cannot be reassigned. To allow for a single thread to execute multiple functions over the course of its life, the thread will want to launch with a function that will read from a queue, such as io_service::run(), and callable types are posted into the event queue, such as from io_service::post().
  • io_service::run() returns if there is no work pending in the io_service, the io_service is stopped, or an exception is thrown from a handler that the thread was running. To prevent io_serivce::run() from returning when there is no unfinished work, the io_service::work class can be used.
  • Defining the task's type requirements (i.e. the task's type must be callable by object() syntax) instead of requiring a type (i.e. task must inherit from process), provides more flexibility to the user. It allows the user to supply a task as a function pointer or a type providing a nullary operator().

Implementation using boost::asio:

#include <boost/asio.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  boost::asio::io_service io_service_;
  boost::asio::io_service::work work_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : work_( io_service_ ),
      available_( pool_size )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &boost::asio::io_service::run,
                                           &io_service_ ) );
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Force all threads to return from io_service::run().
    io_service_.stop();

    // Suppress all exceptions.
    try
    {
      threads_.join_all();
    }
    catch ( const std::exception& ) {}
  }

  /// @brief Adds a task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Post a wrapped task into the queue.
    io_service_.post( boost::bind( &thread_pool::wrap_task, this,
                                   boost::function< void() >( task ) ) );
  }

private:
  /// @brief Wrap a task so that the available count can be increased once
  ///        the user provided task has completed.
  void wrap_task( boost::function< void() > task )
  {
    // Run the user supplied task.
    try
    {
      task();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}

    // Task has finished, so increment count of available threads.
    boost::unique_lock< boost::mutex > lock( mutex_ );
    ++available_;
  }
};

A few comments about the implementation:

  • Exception handling needs to occur around the user's task. If the user's function or callable object throws an exception that is not of type boost::thread_interrupted, then std::terminate() is called. This is the the result of Boost.Thread's exceptions in thread functions behavior. It is also worth reading Boost.Asio's effect of exceptions thrown from handlers.
  • If the user provides the task via boost::bind, then the nested boost::bind will fail to compile. One of the following options is required:
    • Not support task created by boost::bind.
    • Meta-programming to perform compile-time branching based on whether or not the user's type if the result of boost::bind so that boost::protect could be used, as boost::protect only functions properly on certain function objects.
    • Use another type to pass the task object indirectly. I opted to use boost::function for readability at the cost of losing the exact type. boost::tuple, while slightly less readable, could also be used to preserve the exact type, as seen in the Boost.Asio's serialization example.

Application code can now use the thread_pool type non-intrusively:

void work() {};

struct worker
{
  void operator()() {};
};

void more_work( int ) {};

int main()
{ 
  thread_pool pool( 2 );
  pool.run_task( work );                        // Function pointer.
  pool.run_task( worker() );                    // Callable object.
  pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}

The thread_pool could be created without Boost.Asio, and may be slightly easier for maintainers, as they no longer need to know about Boost.Asio behaviors, such as when does io_service::run() return, and what is the io_service::work object:

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  std::queue< boost::function< void() > > tasks_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
  boost::condition_variable condition_;
  bool running_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : available_( pool_size ),
      running_( true )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Set running flag to false then notify all threads.
    {
      boost::unique_lock< boost::mutex > lock( mutex_ );
      running_ = false;
      condition_.notify_all();
    }

    try
    {
      threads_.join_all();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}
  }

  /// @brief Add task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Set task and signal condition variable so that a worker thread will
    // wake up andl use the task.
    tasks_.push( boost::function< void() >( task ) );
    condition_.notify_one();
  }

private:
  /// @brief Entry point for pool threads.
  void pool_main()
  {
    while( running_ )
    {
      // Wait on condition variable while the task is empty and the pool is
      // still running.
      boost::unique_lock< boost::mutex > lock( mutex_ );
      while ( tasks_.empty() && running_ )
      {
        condition_.wait( lock );
      }
      // If pool is no longer running, break out.
      if ( !running_ ) break;

      // Copy task locally and remove from the queue.  This is done within
      // its own scope so that the task object is destructed immediately
      // after running the task.  This is useful in the event that the
      // function contains shared_ptr arguments bound via bind.
      {
        boost::function< void() > task = tasks_.front();
        tasks_.pop();

        lock.unlock();

        // Run the task.
        try
        {
          task();
        }
        // Suppress all exceptions.
        catch ( const std::exception& ) {}
      }

      // Task has finished, so increment count of available threads.
      lock.lock();
      ++available_;
    } // while running_
  }
};
like image 66
Tanner Sansbury Avatar answered Oct 16 '22 20:10

Tanner Sansbury