Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

boost::asio and Active Object

I have implemented some module based Active Object design pattern. It is very simple implementation. I have Scheduler, ActivationList, Requests and Futures to get response. My requirements were like that:

  • Access to active object shall be serialized by executing its methods within its own thread (main req and assumption of Active Object design pattern)
  • Caller shall be able to specify the priority of requests execution. It means that if there is more than zero requests waiting for execution, they shall be ordered by the priority assigned to each request. Requests with higher priority shall be executed first so if there will be some requests pending on the ActivationList always and they will have higher priority than a given requests, this request will never be executed - its OK for me
  • It shall be possible to specify the maximum number of requests pending on the list (limit the memory usage)
  • It shall be possible to invalidate all pending requests
  • Requests shall be able to return values (blocking the caller) OR just shall be executed without value return but caller shall be blocked until request is processed OR caller shall not be blocked and it is not important for it if given request has been processed or not g
  • Just before request execution, some guard method shall be executed to check if given request shall be executed or not. If not - it shall return some undefined value to caller (in my current implementation it is boost::none, because each request return type is boost::optional)

OK now question: Is it possible to use boost::asio and fulfill all my requirements? My implementation is working but I would like to use something what is probably implemented in much better way than I have done this. Also I would like to know it for the future and do not "reinvent the wheel" once again.

like image 884
user2301299 Avatar asked May 01 '13 01:05

user2301299


1 Answers

Boost.Asio can be used to encompass the intention of Active Object: decouple method execution from method invocation. Additional requirements will need to be handled at a higher-level, but it is not overly complex when using Boost.Asio in conjunction with other Boost libraries.

Scheduler could use:

  • boost::thread for thread abstraction.
  • boost::thread_group to manage lifetime of threads.
  • boost::asio::io_service to provide a threadpool. Will likely want to use boost::asio::io_service::work to keep threads alive when no work is pending.

ActivationList could be implemented as:

  • A Boost.MultiIndex for obtaining highest priority method request. With a hinted-position insert(), the insertion order is preserved for request with the same priority.
  • std::multiset or std::multimap can be used. However, it is unspecified in C++03 as to the order of request with the same key (priority).
  • If Request do not need an guard method, then std::priority_queue could be used.

Request could be an unspecified type:

  • boost::function and boost::bind could be used to provide a type-erasure, while binding to callable types without introducing a Request hierarchy.

Futures could use Boost.Thread's Futures support.

  • future.valid() will return true if Request has been added to ActivationList.
  • future.wait() will block waiting for a result to become available.
  • future.get() will block waiting for the result.
  • If caller does nothing with the future, then caller will not be blocked.
  • Another benefit to using Boost.Thread's Futures is that exceptions originating from within a Request will be passed to the Future.

Here is a complete example leveraging various Boost libraries and should meet the requirements:

// Standard includes
#include <algorithm> // std::find_if
#include <iostream>
#include <string>

// 3rd party includes
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/make_shared.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/utility/result_of.hpp>

/// @brief scheduler that provides limits with prioritized jobs.
template <typename Priority,
          typename Compare = std::less<Priority> >
class scheduler
{
public:
  typedef Priority priority_type;
private:

  /// @brief method_request is used to couple the guard and call
  ///        functions for a given method.
  struct method_request
  {
    typedef boost::function<bool()> ready_func_type;
    typedef boost::function<void()> run_func_type;

    template <typename ReadyFunctor,
              typename RunFunctor>
    method_request(ReadyFunctor ready,
                   RunFunctor run)
      : ready(ready),
        run(run)
    {}

    ready_func_type ready;
    run_func_type run;
  };

  /// @brief Pair type used to associate a request with its priority.
  typedef std::pair<priority_type,
                    boost::shared_ptr<method_request> > pair_type;

  static bool is_method_ready(const pair_type& pair)
  {
    return pair.second->ready();
  }

public:

  /// @brief Construct scheduler.
  ///
  /// @param max_threads Maximum amount of concurrent task.
  /// @param max_request Maximum amount of request.  
  scheduler(std::size_t max_threads,
            std::size_t max_request)
    : work_(io_service_),
      max_request_(max_request),
      request_count_(0)
  {
    // Spawn threads, dedicating them to the io_service.
    for (std::size_t i = 0; i < max_threads; ++i)
      threads_.create_thread(
        boost::bind(&boost::asio::io_service::run, &io_service_));
  }

  /// @brief Destructor.
  ~scheduler()
  {
    // Release threads from the io_service.
    io_service_.stop();
    // Cleanup.
    threads_.join_all();
  }

  /// @brief Insert a method request into the scheduler.
  ///
  /// @param priority Priority of job.
  /// @param ready_func Invoked to check if method is ready to run.
  /// @param run_func Invoked when ready to run.
  ///
  /// @return future associated with the method.
  template <typename ReadyFunctor,
            typename RunFunctor>
  boost::unique_future<typename boost::result_of<RunFunctor()>::type>
  insert(priority_type priority, 
         const ReadyFunctor& ready_func,
         const RunFunctor& run_func)
  {
    typedef typename boost::result_of<RunFunctor()>::type result_type;
    typedef boost::unique_future<result_type> future_type;

    boost::unique_lock<mutex_type> lock(mutex_);

    // If max request has been reached, then return an invalid future.
    if (max_request_ &&
        (request_count_ == max_request_))
      return future_type();

    ++request_count_;

    // Use a packaged task to handle populating promise and future.
    typedef boost::packaged_task<result_type> task_type;

    // Bind does not work with rvalue, and packaged_task is only moveable,
    // so allocate a shared pointer.
    boost::shared_ptr<task_type> task = 
      boost::make_shared<task_type>(run_func);

    // Create method request.
    boost::shared_ptr<method_request> request =
      boost::make_shared<method_request>(
        ready_func,
        boost::bind(&task_type::operator(), task));

    // Insert into priority.  Hint to inserting as close to the end as
    // possible to preserve insertion order for request with same priority.
    activation_list_.insert(activation_list_.end(),
                            pair_type(priority, request));

    // There is now an outstanding request, so post to dispatch.
    io_service_.post(boost::bind(&scheduler::dispatch, this));

    return task->get_future();
  }

  /// @brief Insert a method request into the scheduler.
  ///
  /// @param ready_func Invoked to check if method is ready to run.
  /// @param run_func Invoked when ready to run.
  ///
  /// @return future associated with the method.
  template <typename ReadyFunctor,
            typename RunFunctor>
  boost::unique_future<typename boost::result_of<RunFunctor()>::type>
  insert(const ReadyFunctor& ready_func,
         const RunFunctor& run_func)
  {
    return insert(priority_type(), ready_func, run_func);
  }

  /// @brief Insert a method request into the scheduler.
  ///
  /// @param priority Priority of job.
  /// @param run_func Invoked when ready to run.
  ///
  /// @return future associated with the method.
  template <typename RunFunctor>
  boost::unique_future<typename boost::result_of<RunFunctor()>::type>
  insert(priority_type priority, 
         const RunFunctor& run_func)
  {
    return insert(priority, &always_ready, run_func);
  }

  /// @brief Insert a method request with default priority into the
  ///        scheduler.
  ///
  /// @param run_func Invoked when ready to run.
  ///
  /// @param functor Job to run.
  ///
  /// @return future associated with the job.
  template <typename RunFunc>
  boost::unique_future<typename boost::result_of<RunFunc()>::type>
  insert(const RunFunc& run_func)
  {
    return insert(&always_ready, run_func);
  }

  /// @brief Cancel all outstanding request.
  void cancel()
  {
    boost::unique_lock<mutex_type> lock(mutex_);
    activation_list_.clear();
    request_count_ = 0;
  } 

private:

  /// @brief Dispatch a request.
  void dispatch()
  {
    // Get the current highest priority request ready to run from the queue.
    boost::unique_lock<mutex_type> lock(mutex_);
    if (activation_list_.empty()) return;

    // Find the highest priority method ready to run.
    typedef typename activation_list_type::iterator iterator;
    iterator end = activation_list_.end();
    iterator result = std::find_if(
      activation_list_.begin(), end, &is_method_ready);

    // If no methods are ready, then post into dispatch, as the
    // method may have become ready.
    if (end == result)
    {
      io_service_.post(boost::bind(&scheduler::dispatch, this));
      return;
    }

    // Take ownership of request.
    boost::shared_ptr<method_request> method = result->second;
    activation_list_.erase(result);

    // Run method without mutex.
    lock.unlock();
    method->run();    
    lock.lock();

    // Perform bookkeeping.
    --request_count_;
  }

  static bool always_ready() { return true; }

private:

  /// @brief List of outstanding request.
  typedef boost::multi_index_container<
    pair_type,
    boost::multi_index::indexed_by<
      boost::multi_index::ordered_non_unique<
        boost::multi_index::member<pair_type,
                                   typename pair_type::first_type,
                                   &pair_type::first>,
        Compare
      >
    >
  > activation_list_type;
  activation_list_type activation_list_;

  /// @brief Thread group managing threads servicing pool.
  boost::thread_group threads_;

  /// @brief io_service used to function as a thread pool.
  boost::asio::io_service io_service_;

  /// @brief Work is used to keep threads servicing io_service.
  boost::asio::io_service::work work_;

  /// @brief Maximum amount of request.
  const std::size_t max_request_;

  /// @brief Count of outstanding request.
  std::size_t request_count_;

  /// @brief Synchronize access to the activation list.
  typedef boost::mutex mutex_type;
  mutex_type mutex_;
};

typedef scheduler<unsigned int, 
                  std::greater<unsigned int> > high_priority_scheduler;

/// @brief adder is a simple proxy that will delegate work to
///        the scheduler.
class adder
{
public:
  adder(high_priority_scheduler& scheduler)
    : scheduler_(scheduler)
  {}

  /// @brief Add a and b with a priority.
  ///
  /// @return Return future result.
  template <typename T>
  boost::unique_future<T> add(
    high_priority_scheduler::priority_type priority,
    const T& a, const T& b)
  {
    // Insert method request
    return scheduler_.insert(
      priority,
      boost::bind(&adder::do_add<T>, a, b));
  }

  /// @brief Add a and b.
  ///
  /// @return Return future result.
  template <typename T>
  boost::unique_future<T> add(const T& a, const T& b)
  {
    return add(high_priority_scheduler::priority_type(), a, b);
  }

private:

  /// @brief Actual add a and b.
  template <typename T>
  static T do_add(const T& a, const T& b)
  {
    std::cout << "Starting addition of '" << a 
              << "' and '" << b << "'" << std::endl;
    // Mimic busy work.
    boost::this_thread::sleep_for(boost::chrono::seconds(2));
    std::cout << "Finished addition" << std::endl;
    return a + b;
  }

private:
  high_priority_scheduler& scheduler_;
};

bool get(bool& value) { return value; }
void guarded_call()
{
  std::cout << "guarded_call" << std::endl; 
}

int main()
{
  const unsigned int max_threads = 1;
  const unsigned int max_request = 4;

  // Sscheduler
  high_priority_scheduler scheduler(max_threads, max_request);

  // Proxy
  adder adder(scheduler);

  // Client

  // Add guarded method to scheduler.
  bool ready = false;
  std::cout << "Add guarded method." << std::endl;
  boost::unique_future<void> future1 = scheduler.insert(
    boost::bind(&get, boost::ref(ready)),
    &guarded_call);

  // Add 1 + 100 with default priority.
  boost::unique_future<int> future2 = adder.add(1, 100);

  // Force sleep to try to get scheduler to run request 2 first.
  boost::this_thread::sleep_for(boost::chrono::seconds(1));

  // Add:
  //   2 + 200 with low priority (5)
  //   "test" + "this" with high priority (99)
  boost::unique_future<int> future3 = adder.add(5, 2, 200);
  boost::unique_future<std::string> future4 = adder.add(99,
    std::string("test"), std::string("this"));

  // Max request should have been reached, so add another.
  boost::unique_future<int> future5 = adder.add(3, 300);

  // Check if request was added.
  std::cout << "future1 is valid: " << future1.valid()
          << "\nfuture2 is valid: " << future2.valid()
          << "\nfuture3 is valid: " << future3.valid()
          << "\nfuture4 is valid: " << future4.valid()
          << "\nfuture5 is valid: " << future5.valid()
          << std::endl;

  // Get results for future2 and future3.  Do nothing with future4's results.
  std::cout << "future2 result: " << future2.get()
          << "\nfuture3 result: " << future3.get()
          << std::endl;

  std::cout << "Unguarding method." << std::endl;
  ready = true;
  future1.wait();
}

The execution uses thread pool of 1 with a max of 4 request.

  • request1 is guarded until the end of program, and should be last to run.
  • request2 (1 + 100) is inserted with default priority, and should be first to run.
  • request3 (2 + 200) is inserted low priority, and should run after request4.
  • request4 ('test' + 'this') is inserted with high priority, and should run before request3.
  • request5 should fail to insert due to max request, and should not be valid.

The output is as follows:

Add guarded method.
Starting addition of '1' and '100'
future1 is valid: 1
future2 is valid: 1
future3 is valid: 1
future4 is valid: 1
future5 is valid: 0
Finished addition
Starting addition of 'test' and 'this'
Finished addition
Starting addition of '2' and '200'
Finished addition
future2 result: 101
future3 result: 202
Unguarding method.
guarded_call
like image 145
Tanner Sansbury Avatar answered Sep 29 '22 19:09

Tanner Sansbury