Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Extension of boost::asio hangs after being interrupted

Boris' article shows us how to create extension of boost::asio. I try to add signal_set and async_wait on registered signals. Then the program hangs until a second SIGINT is triggered. Though, I would like to finish it properly within one signal only.

Here is my code. I test it with gcc-4.6.3 and boost-1.52.0 on Ubuntu.

To compile -

gcc -I/boost_inc -L/boot_lib main.cpp -lpthread -lboost_system -lboost_thread

#include <boost/asio.hpp> 
#include <iostream> 
#include <boost/thread.hpp> 
#include <boost/bind.hpp> 
#include <boost/scoped_ptr.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/weak_ptr.hpp> 
#include <cstddef> 

template <typename Service> 
class basic_timer 
  : public boost::asio::basic_io_object<Service> 
{ 
  public: 
    explicit basic_timer(boost::asio::io_service &io_service) 
      : boost::asio::basic_io_object<Service>(io_service) 
    {} 

    void wait(std::size_t seconds) 
    { return this->service.wait(this->implementation, seconds); } 

    template <typename Handler> 
    void async_wait(std::size_t seconds, Handler handler) 
    { this->service.async_wait(this->implementation, seconds, handler); } 
}; 

class timer_impl;

template <typename TimerImplementation = timer_impl> 
class basic_timer_service 
  : public boost::asio::io_service::service 
{ 
  public: 
    static boost::asio::io_service::id id; 

    explicit basic_timer_service(boost::asio::io_service &io_service) 
      : boost::asio::io_service::service(io_service), 
      async_work_(new boost::asio::io_service::work(async_io_service_)), 
      async_thread_(
        boost::bind(&boost::asio::io_service::run, &async_io_service_)) 
    {} 

    ~basic_timer_service() 
    { 
      async_work_.reset(); 
      async_io_service_.stop(); 
      async_thread_.join();  // program is blocked here until the second 
                             // signal is triggerd
      async_io_service_.reset();
    } 

    typedef boost::shared_ptr<TimerImplementation> implementation_type; 

    void construct(implementation_type &impl) 
    { 
      impl.reset(new TimerImplementation()); 
    } 

    void destroy(implementation_type &impl) 
    { 
      impl->destroy(); 
      impl.reset(); 
    } 

    void wait(implementation_type &impl, std::size_t seconds) 
    { 
      boost::system::error_code ec; 
      impl->wait(seconds, ec); 
      boost::asio::detail::throw_error(ec); 
    } 

    template <typename Handler> 
    class wait_operation 
    { 
      public: 
        wait_operation(
          implementation_type &impl,
          boost::asio::io_service &io_service, 
          std::size_t seconds, Handler handler) 
          : impl_(impl), 
          io_service_(io_service), 
          work_(io_service), 
          seconds_(seconds), 
          handler_(handler) 
        {} 

        void operator()() const 
        { 
          implementation_type impl = impl_.lock(); 
          if (!io_service_.stopped() && impl) 
          { 
              boost::system::error_code ec; 
              impl->wait(seconds_, ec); 
              this->io_service_.post(
                boost::asio::detail::bind_handler(handler_, ec)); 
          } 
          else 
          { 
              this->io_service_.post(
                boost::asio::detail::bind_handler(
                  handler_, boost::asio::error::operation_aborted)); 
          } 
      } 

      private: 
        boost::weak_ptr<TimerImplementation> impl_; 
        boost::asio::io_service &io_service_; 
        boost::asio::io_service::work work_; 
        std::size_t seconds_; 
        Handler handler_; 
    }; 

    template <typename Handler> 
    void async_wait(
      implementation_type &impl, 
      std::size_t seconds, Handler handler) 
    { 
      this->async_io_service_.post(
        wait_operation<Handler>(
          impl, this->get_io_service(), seconds, handler)); 
    } 

  private: 
    void shutdown_service() 
    {} 

    boost::asio::io_service async_io_service_; 
    boost::scoped_ptr<boost::asio::io_service::work> async_work_; 
    boost::thread async_thread_; 
}; 

class timer_impl 
{ 
  public: 
    timer_impl() 
    {} 

    ~timer_impl() 
    {} 

    void destroy() 
    {} 

    void wait(std::size_t seconds, boost::system::error_code &ec) 
    { 
      sleep(seconds);
      ec = boost::system::error_code(); 
    } 
};


typedef basic_timer<basic_timer_service<> > timer; 

template <typename TimerImplementation> 
boost::asio::io_service::id basic_timer_service<TimerImplementation>::id; 
void wait_handler(const boost::system::error_code &ec) 
{ 
  std::cout << "5 s." << std::endl; 
} 

int main() 
{ 
  {
    boost::asio::io_service io_service; 
    boost::asio::signal_set signals(io_service);
    timer t(io_service);

    signals.add(SIGINT);
    signals.async_wait(
      boost::bind(&boost::asio::io_service::stop, &io_service));

    t.async_wait(2, wait_handler); 

    std:: cout << "async called\n" ;
    io_service.run(); 
  } 

  { // this block will not be executed
    boost::asio::io_service io_service; 
    timer t(io_service); 

    t.async_wait(2, wait_handler); 
    std:: cout << "async called\n" ;
    io_service.run(); 
  }
  return 0;
} 
like image 347
Acer Avatar asked Oct 23 '12 06:10

Acer


1 Answers

After tried an example offered by the author of asio, I confronted the same behavior. Consequently, I dig into the library source and found that the source use io_service_impl's interfaces rather than ones of io_service. Furthermore, an operation functor posted to the io_service_impl is different from ones invoked by the io_service. Altogether, I decided to rewrite the timer example according to the internal interfaces of asio.

I hereby present the rewritten timer example.

#include <boost/asio.hpp> 
#include <iostream> 
#include <boost/thread.hpp> 
#include <boost/bind.hpp> 
#include <boost/scoped_ptr.hpp> 
#include <boost/shared_ptr.hpp> 
#include <boost/weak_ptr.hpp> 
#include <cstddef> 

#define get_service_impl(X) \
  ba::use_service<bad::io_service_impl>(X)

namespace ba = boost::asio;
namespace bad = boost::asio::detail;

// Nothing changed
template <typename Service> 
class basic_timer 
  : public boost::asio::basic_io_object<Service> 
{ 
public: 
  explicit basic_timer(boost::asio::io_service &io_service) 
    : boost::asio::basic_io_object<Service>(io_service) 
  {} 

  void wait(std::size_t seconds) 
  { return this->service.wait(this->implementation, seconds); } 

  template <typename Handler> 
  void async_wait(std::size_t seconds, Handler handler) 
  { this->service.async_wait(this->implementation, seconds, handler); } 
}; 

// Nothing changed
class timer_impl 
{ 
public: 
  void wait(std::size_t seconds, boost::system::error_code &ec) 
  { 
    sleep(seconds);
    ec = boost::system::error_code(); 
  } 
};

// ----- Change a lot! --------
class basic_timer_service 
: public boost::asio::io_service::service 
{ 
public:
  typedef boost::asio::detail::socket_ops::shared_cancel_token_type 
    implementation_type;

  static boost::asio::io_service::id id; 

  explicit basic_timer_service(boost::asio::io_service &io_service) 
    : boost::asio::io_service::service(io_service), 
      io_service_impl_(get_service_impl(io_service)),
      work_io_service_( new boost::asio::io_service ),
      work_io_service_impl_(get_service_impl(*work_io_service_)),
      work_(new ba::io_service::work(*work_io_service_)),
      work_thread_() // do not create thread here
  {} 

  ~basic_timer_service() 
  {  shutdown_service(); } 

  void construct(implementation_type &impl) 
  { impl.reset(new timer_impl()); } 

  void cancel(implementation_type &impl)
  {
        impl.reset((void*)0, boost::asio::detail::socket_ops::noop_deleter());
  }

  void destroy(implementation_type &impl) 
  { impl.reset(); } 

  void shutdown_service() 
  {
    work_.reset();
    if(work_io_service_.get()){
      work_io_service_->stop();
      if (work_thread_.get()){
        work_thread_->join();
        work_thread_.reset();
      }
    }
    work_io_service_.reset();
  } 

  void wait(implementation_type &impl, std::size_t seconds) 
  { 
    boost::system::error_code ec; 
    // XXX I not sure this is safe
    timer_impl *impl_ptr = static_cast<timer_impl*>(impl.get());
    impl_ptr->wait(seconds, ec); 
    boost::asio::detail::throw_error(ec); 
  } 

  template <typename Handler> 
  class wait_operation
  : public boost::asio::detail::operation
  { 
  public: 
    BOOST_ASIO_DEFINE_HANDLER_PTR(wait_operation);

    // namespace ba = boost::asio
    // namespace bad = boost::asio::detail
    wait_operation(
      bad::socket_ops::weak_cancel_token_type cancel_token,
      std::size_t seconds, 
      bad::io_service_impl& ios,
      Handler handler) 
      : bad::operation(&wait_operation::do_complete), 
      cancel_token_(cancel_token),
      seconds_(seconds),
      io_service_impl_(ios),
      handler_(handler) 
    {} 

    static void do_complete(
      bad::io_service_impl *owner,
      bad::operation *base,
      boost::system::error_code const & /* ec */ ,
      std::size_t /* byte_transferred */ )
    { 
      wait_operation *o(static_cast<wait_operation*>(base));
      ptr p = { boost::addressof(o->handler_), o, o};

      // Distinguish between main io_service and private io_service
      if(owner && owner != &o->io_service_impl_)
      { // private io_service

        // Start blocking call
        bad::socket_ops::shared_cancel_token_type lock =
          o->cancel_token_.lock();

        if(!lock){
          o->ec_ = boost::system::error_code(
            ba::error::operation_aborted,
                            boost::system::system_category());
        }else{
          timer_impl *impl = static_cast<timer_impl*>(lock.get());
          impl->wait(o->seconds_, o->ec_);
        }
        // End of blocking call

        o->io_service_impl_.post_deferred_completion(o);
        p.v = p.p = 0;
      }else{ // main io_service
        bad::binder1<Handler, boost::system::error_code> 
          handler(o->handler_, o->ec_);
        p.h = boost::addressof(handler.handler_);
        p.reset();
        if(owner){
          bad::fenced_block b(bad::fenced_block::half);
          boost_asio_handler_invoke_helpers::invoke(
                        handler, handler.handler_);
        }
      } 
    } 

  private: 
    bad::socket_ops::weak_cancel_token_type cancel_token_; 
    std::size_t seconds_; 
    bad::io_service_impl &io_service_impl_; 
    Handler handler_; 
    boost::system::error_code ec_;
  }; 

  template <typename Handler> 
  void async_wait(
    implementation_type &impl, 
    std::size_t seconds, Handler handler) 
  { 
    typedef wait_operation<Handler> op;
    typename op::ptr p = {
      boost::addressof(handler),
      boost_asio_handler_alloc_helpers::allocate(
        sizeof(op), handler), 0};
    p.p = new (p.v) op(impl, seconds, io_service_impl_, handler);
    start_op(p.p);
    p.v = p.p = 0;
  }

protected:
  // Functor for runing background thread
  class work_io_service_runner
  {
  public:  
    work_io_service_runner(ba::io_service &io_service)
      : io_service_(io_service) {}

    void operator()(){ io_service_.run(); }
  private:
    ba::io_service &io_service_;
  };

  void start_op(bad::operation* op)
  {
    start_work_thread();
    io_service_impl_.work_started();
    work_io_service_impl_.post_immediate_completion(op);
  }

  void start_work_thread()
  {
    bad::mutex::scoped_lock lock(mutex_);
    if (!work_thread_.get())
    {
      work_thread_.reset(new bad::thread(
          work_io_service_runner(*work_io_service_)));
    }
  }

  bad::io_service_impl& io_service_impl_;

private: 
  bad::mutex mutex_;
  boost::scoped_ptr<ba::io_service> work_io_service_;
  bad::io_service_impl &work_io_service_impl_;
  boost::scoped_ptr<ba::io_service::work> work_;
  boost::scoped_ptr<bad::thread> work_thread_;
}; 

boost::asio::io_service::id basic_timer_service::id; 

typedef basic_timer<basic_timer_service> timer; 

void wait_handler(const boost::system::error_code &ec) 
{ 
  if(!ec)
    std::cout << "wait_handler is called\n" ;
  else
    std::cerr << "Error: " << ec.message() << "\n";
} 

int main() 
{ 
  {
    boost::asio::io_service io_service; 
    boost::asio::signal_set signals(io_service);
    timer t(io_service); 

    signals.add(SIGINT);
    signals.async_wait(
      boost::bind(&boost::asio::io_service::stop, &io_service));

    t.async_wait(2, wait_handler); 

    std:: cout << "async called\n" ;
    io_service.run(); 
    std:: cout << "exit loop\n";
  } 

  {
    boost::asio::io_service io_service; 
    timer t(io_service); 

    t.async_wait(2, wait_handler); 
    std:: cout << "async called\n" ;
    io_service.run(); 
  }
  return 0;
} 

To compile

gcc -I/boost_inc -L/boot_lib main.cpp -lpthread -lboost_system -lboost_thread

The new timer works fine. Still I would like to know how to write a non-intrusive extension of asio.

like image 54
Acer Avatar answered Nov 15 '22 11:11

Acer