Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there any way to asynchronously wait for a future in Boost Asio?

My problem is the following. I start several operations asynchronously, and I want to continue until all of them are finished. Using Boost Asio, the most straightforward way to do this is the following. Suppose tasks is some kind of container of objects that support some asynchronous operation.

tasksToGo = tasks.size();
for (auto task: tasks) {
    task.async_do_something([](const boost::system::error_code& ec)
    {
        if (ec) {
            // handle error
        } else {
            if (--taslsToGo == 0) {
                 tasksFinished();
            }
        }
    });
}

The problem with this solution is that it feels like a workaround. In Boost 1.54 I can do it with futures but I can only wait synchronously, which is only possible from a thread separate from where run() is called.

for (auto task: tasks) {
    futures.push_back(task.async_do_something(boost::asio::use_future));
}

for (auto future: futures) {
    future.wait();
}

This code is much clearer than the previous one, but I need a separate thread which I don't want. I want something that can be used like this:

for (auto task: tasks) {
    futures.push_back(task.async_do_something(boost::asio::use_future));
}

boost::asio::spawn(ioService, [](boost::asio::yield_context yield)
{
    for (auto future: futures) {
        future.async_wait(yield);
    }
    tasksFinished();

}

Is there anything that can be used similarly?

like image 351
petersohn Avatar asked Aug 13 '13 06:08

petersohn


People also ask

Is boost asio multithreaded?

If the run() method is called on an object of type boost::asio::io_service, the associated handlers are invoked on the same thread. By using multiple threads, an application can call multiple run() methods simultaneously.

Does boost asio use Epoll?

For me, main advantage of Boost. Asio (besides cross-platform work) is, that on each platform, it uses most effective strategy ( epoll on Linux 2.6, kqueue on FreeBSD/MacOSX, Overlapped IO on MS Windows).

How does boost asio work?

At its core, Boost Asio provides a task execution framework that you can use to perform operations of any kind. You create your tasks as function objects and post them to a task queue maintained by Boost Asio. You enlist one or more threads to pick these tasks (function objects) and invoke them.

Who is using boost asio?

3 Answers. Show activity on this post. The systems software for managing an IBM Blue Gene/Q supercomputer uses Boost. Asio extensively.


1 Answers

As far as I know, there is currently no first-class support for this. However, given the direction of the library, I would be surprised if this functionality was not available in the future.

A few papers have been proposed to add support for this type of functionality:

  • N3558 - A Standardized Representation of Asynchronous Operations is particularly interesting. It proposes when_all(futures) and future.next(). If it is implemented, then it would be possible to represent the asynchronous chain as:

    for (auto task: tasks) {
        futures.push_back(task.async_do_something(boost::asio::use_future));
    }
    when_all(futures).then(&tasksFinished);
    
  • N3562 - Executors and schedulers introduces executors. Which can be used to provided finer control as to the context in which an async can execute. For Boost.Asio, this would likely require providing some type of executor that defers to the io_service.

While these papers are still ongoing, it may be worthwhile to periodically check Boost.Thread's Conformance and Extension page and Boost.Asio's github for early adaptations of these proposals.


I had the need for this functionality a year ago with a much earlier version of Boost so worked on my own solution. There are still some rough areas with regards to the semantics, but it may be helpful as a reference material until something official is adopted. Before I provide the code, here is an example application based on your question:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include "async_ops.hpp"

void handle_timer(const boost::system::error_code& error, int x)
{
  std::cout << "in handle timer: " << x << " : "
            << error.message() <<  std::endl;
}

void a() { std::cout << "a" << std::endl; }
void b() { std::cout << "b" << std::endl; }
void c() { std::cout << "c" << std::endl; }

int main()
{
  boost::asio::io_service io_service;
  boost::asio::deadline_timer timer1(io_service);
  boost::asio::deadline_timer timer2(io_service);

  // Create a chain that will continue once 2 handlers have been executed.
  chain all_expired = when_all(io_service, 2);

  all_expired.then(&a)  // Once 2 handlers finish, run a within io_service.
             .then(&b)  // Once a has finished, run b within io_service.
             .then(&c); // Once b has finished, run c within io_service.

  // Set expiration times for timers.
  timer1.expires_from_now(boost::posix_time::seconds(2));
  timer2.expires_from_now(boost::posix_time::seconds(5));

  // Asynchrnously wait for the timers, wrapping the handlers with the chain.
  timer1.async_wait(all_expired.wrap(
      boost::bind(&handle_timer, boost::asio::placeholders::error, 1)));
  timer2.async_wait(all_expired.wrap(
      boost::bind(&handle_timer, boost::asio::placeholders::error, 2)));

  // Run the io_service.
  io_service.run();
}

Which produces the following output:

in handle timer: 1 : Success
in handle timer: 2 : Success
a
b
c

And here is async_ops.hpp:

#include <vector>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/bind/protect.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
#include <boost/make_shared.hpp>
#include <boost/range/iterator_range.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/type_traits/is_integral.hpp>
#include <boost/type_traits/remove_reference.hpp>
#include <boost/utility/enable_if.hpp>

class chain;

namespace detail {

/// @brief Chained handler connects two handlers together that will
///        be called sequentially.
///
/// @note Type erasure is not performed on Handler1 to allow resolving
///       to the correct asio_handler_invoke via ADL.
template <typename Handler1> 
class chained_handler
{
public:
  template <typename Handler2>
  chained_handler(Handler1 handler1, Handler2 handler2)
    : handler1_(handler1),
      handler2_(handler2)
  {}

  void operator()()
  {
    handler1_();
    handler2_();
  }

  template <typename Arg1>
  void operator()(const Arg1& a1)
  {
    handler1_(a1);
    handler2_();
  }

  template <typename Arg1, typename Arg2>
  void operator()(const Arg1& a1, const Arg2& a2)
  {
    handler1_(a1, a2);
    handler2_();
  }

//private:
  Handler1 handler1_;
  boost::function<void()> handler2_;
};

/// @brief Hook that allows the sequential_handler to be invoked
///        within specific context based on the hander's type.
template <typename Function,
          typename Handler>
void asio_handler_invoke(
  Function function,
  chained_handler<Handler>* handler)
{
  boost_asio_handler_invoke_helpers::invoke(
    function, handler->handler1_);
}

/// @brief No operation.
void noop() {}

/// @brief io_service_executor is used to wrap handlers, providing a
///        deferred posting to an io_service.  This allows for chains
///        to inherit io_services from other chains.
class io_service_executor
  : public boost::enable_shared_from_this<io_service_executor>
{
public:
  /// @brief Constructor.
  explicit
  io_service_executor(boost::asio::io_service* io_service)
    : io_service_(io_service)
  {}

  /// @brief Wrap a handler, returning a functor that will post the
  ///        provided handler into the io_service.
  ///
  /// @param handler Handler to be wrapped for deferred posting.
  /// @return Functor that will post handler into io_service.
  template <typename Handler>
  boost::function<void()> wrap(Handler handler)
  {
    // By binding to the io_service_exectuer's post, the io_service
    // into which the handler can be posted can be specified at a later 
    // point in time.
    return boost::bind(&io_service_executor::post<Handler>,
                       shared_from_this(), handler);
  }

  /// @brief Set the io_service.
  void io_service(boost::asio::io_service* io_service)
  {
    io_service_ = io_service;
  }

  /// @brief Get the io_service.
  boost::asio::io_service* io_service()
  {
    return io_service_;
  }

private:

  /// @brief Post handler into the io_service.
  ///
  /// @param handler The handler to post.
  template <typename Handler>
  void post(Handler handler)
  {
    io_service_->post(handler);
  }

private:
  boost::asio::io_service* io_service_;
};

/// @brief chain_impl is an implementation for a chain.  It is responsible
///        for lifetime management, tracking posting and wrapped functions,
///        as well as determining when run criteria has been satisfied.
class chain_impl
  : public boost::enable_shared_from_this<chain_impl>
{
public:

  /// @brief Constructor.
  chain_impl(boost::shared_ptr<io_service_executor> executor,
             std::size_t required)
    : executor_(executor),
      required_(required)
  {}

  /// @brief Destructor will invoke all posted handlers.
  ~chain_impl()
  {
    run();
  }

  /// @brief Post a handler that will be posted into the executor 
  ///        after run criteria has been satisfied.
  template <typename Handler>
  void post(const Handler& handler)
  {
    deferred_handlers_.push_back(executor_->wrap(handler));
  }

  /// @brief Wrap a handler, returning a chained_handler.  The returned
  ///        handler will notify the impl when it has been invoked.
  template <typename Handler>
  chained_handler<Handler> wrap(const Handler& handler)
  {
    return chained_handler<Handler>(
      handler,                                                 // handler1
      boost::bind(&chain_impl::complete, shared_from_this())); // handler2
  }

  /// @brief Force run of posted handlers.
  void run()
  {
    boost::unique_lock<boost::mutex> guard(mutex_);
    run(guard);
  }

  /// @brief Get the executor.
  boost::shared_ptr<io_service_executor> executor() { return executor_; }

private:

  /// @brief Completion handler invoked when a wrapped handler has been
  ///        invoked.
  void complete()
  {
    boost::unique_lock<boost::mutex> guard(mutex_);

    // Update tracking.
    if (required_)
      --required_;

    // If criteria has not been met, then return early.
    if (required_) return;

    // Otherwise, run the handlers.
    run(guard);    
  }

  /// @brief Run handlers.
  void run(boost::unique_lock<boost::mutex>& guard)
  {
    // While locked, swap handlers into a temporary.
    std::vector<boost::function<void()> > handlers;
    using std::swap;
    swap(handlers, deferred_handlers_);

    // Run handlers without mutex.
    guard.unlock();
    BOOST_FOREACH(boost::function<void()>& handler, handlers)
        handler();
    guard.lock();
  }

private:
  boost::shared_ptr<io_service_executor> executor_;
  boost::mutex mutex_;
  std::size_t required_;
  std::vector<boost::function<void()> > deferred_handlers_;
};

/// @brief Functor used to wrap and post handlers or chains between two
///        implementations.
struct wrap_and_post
{
  wrap_and_post(
    boost::shared_ptr<detail::chain_impl> current,
    boost::shared_ptr<detail::chain_impl> next
  )
    : current_(current),
      next_(next)
  {}

  /// @brief Wrap a handler with next, then post into current.
  template <typename Handler>
  void operator()(Handler handler)
  {
    // Wrap the handler with the next implementation, then post into the
    // current.  The wrapped handler will keep next alive, and posting into
    // current will cause next::complete to be invoked when current is ran.
    current_->post(next_->wrap(handler));
  }

  /// @brief Wrap an entire chain, posting into the current.
  void operator()(chain chain);

private:
  boost::shared_ptr<detail::chain_impl> current_;
  boost::shared_ptr<detail::chain_impl> next_;
};

} // namespace detail

/// @brief Used to indicate that the a chain will inherit its service from an
///        outer chain.
class inherit_service_type {};
inherit_service_type inherit_service;

/// @brief Chain represents an asynchronous call chain, allowing the overall
///        chain to be constructed in a verbose and explicit manner.
class chain
{
public:

  /// @brief Constructor.
  ///
  /// @param io_service The io_service in which the chain will run.
  explicit
  chain(boost::asio::io_service& io_service)
    : impl_(boost::make_shared<detail::chain_impl>(
              boost::make_shared<detail::io_service_executor>(&io_service),
              0)),
      root_impl_(impl_)
  {}

  /// @brief Constructor.  The chain will inherit its io_service from an
  ///        outer chain.
  explicit
  chain(inherit_service_type)
    : impl_(boost::make_shared<detail::chain_impl>(
              boost::make_shared<detail::io_service_executor>(
                static_cast<boost::asio::io_service*>(NULL)),
              0)),
      root_impl_(impl_)
  {}

  /// @brief Force run posted handlers.
  void run()
  {
    root_impl_->run();
  }

  /// @brief Chain link that will complete when the amount of wrapped
  ///        handlers is equal to required.
  ///
  /// @param required The amount of handlers required to be complete.
  template <typename T>
  typename boost::enable_if<boost::is_integral<
    typename boost::remove_reference<T>::type>, chain>::type
  any(std::size_t required = 1)
  {
    return chain(root_impl_, required);
  }

  /// @brief Chain link that wraps all handlers in container, and will
  ///        be complete when the amount of wrapped handlers is equal to
  ///        required.
  ///
  /// @param Container of handlers to wrap.
  /// @param required The amount of handlers required to be complete.
  template <typename Container>
  typename boost::disable_if<boost::is_integral<
    typename boost::remove_reference<Container>::type>, chain>::type
  any(const Container& container, 
      std::size_t required = 1)
  {
    return post(container, required);
  }

  /// @brief Chain link that wraps all handlers in iterator range, and will
  ///        be complete when the amount of wrapped handlers is equal to
  ///        required.
  ///
  /// @param Container of handlers to wrap.
  /// @param required The amount of handlers required to be complete.
  template <typename Iterator>
  chain any(Iterator begin, Iterator end,
            std::size_t required = 1)
  {
    return any(boost::make_iterator_range(begin, end), required);
  }

  /// @brief Chain link that will complete when the amount of wrapped
  ///        handlers is equal to required.
  ///
  /// @param required The amount of handlers required to be complete.
  template <typename T>
  typename boost::enable_if<boost::is_integral<
    typename boost::remove_reference<T>::type>, chain>::type
  all(T required)
  {
    return any<T>(required);
  }

  /// @brief Chain link that wraps all handlers in container, and will
  ///        be complete when all wrapped handlers from the container 
  ///        have been executed.
  ///
  /// @param Container of handlers to wrap.
  template <typename Container>
  typename boost::disable_if<boost::is_integral<
    typename boost::remove_reference<Container>::type>, chain>::type
  all(const Container& container)
  {
    return any(container, container.size());
  }

  /// @brief Chain link that wraps all handlers in iterator range, and will
  ///        be complete when all wrapped handlers from the iterator range
  ///        have been executed.
  ///
  /// @param Container of handlers to wrap.
  template <typename Iterator>
  chain all(Iterator begin, Iterator end)
  {
    return all(boost::make_iterator_range(begin, end));
  }

  /// @brief Chain link that represents a single sequential link.
  template <typename Handler>
  chain then(const Handler& handler)
  {
    boost::array<Handler, 1> handlers = {{handler}};
    return all(handlers);
  }

  /// @brief Wrap a handler, returning a chained_handler.
  template <typename Handler>
  detail::chained_handler<Handler> wrap(const Handler& handler)
  {
    return impl_->wrap(handler);
  }

  /// @brief Set the executor.
  void executor(boost::asio::io_service& io_service)
  {
    impl_->executor()->io_service(&io_service);
  }

  /// @brief Check if this chain should inherit its executor.
  bool inherits_executor()
  {
    return !impl_->executor()->io_service();
  }

private:

  /// @brief Private constructor used to create links in the chain.
  ///
  /// @note All links maintain a handle to the root impl.  When constructing a
  ///       chain, this allows for links later in the chain to be stored as
  ///       non-temporaries.
  chain(boost::shared_ptr<detail::chain_impl> root_impl,
        std::size_t required)
    : impl_(boost::make_shared<detail::chain_impl>(
              root_impl->executor(), required)),
      root_impl_(root_impl)
  {}

  /// @brief Create a new chain link, wrapping handlers and posting into
  ///        the current chain.
  template <typename Container>
  chain post(const Container& container,
             std::size_t required)
  {
    // Create next chain.
    chain next(root_impl_, required);

    // Wrap handlers from the next chain, and post into the current chain.
    std::for_each(container.begin(), container.end(),
                  detail::wrap_and_post(impl_, next.impl_));

    return next;
  }

private:
  boost::shared_ptr<detail::chain_impl> impl_;
  boost::shared_ptr<detail::chain_impl> root_impl_;
};

void detail::wrap_and_post::operator()(chain c)
{
  // If next does not have an executor, then inherit from current.
  if (c.inherits_executor())
      c.executor(*current_->executor()->io_service());

  // When current completes, start the chain.
  current_->post(boost::protect(boost::bind(&chain::run, c)));

  // The next impl needs to be aware of when the chain stops, so
  // wrap a noop and append it to the end of the chain.
  c.then(next_->wrap(&detail::noop));  
}

// Convenience functions.
template <typename T, typename Handler>
chain async(T& t, const Handler& handler)
{
  return chain(t).then(handler);
}

template <typename T,
          typename Container>
chain when_all(T& t, const Container& container)
{
  return chain(t).all(container);
}

template <typename T,
          typename Iterator>
chain when_all(T& t, Iterator begin, Iterator end)
{
  return chain(t).all(begin, end);
}

template <typename T,
          typename Container>
chain when_any(T& t, const Container& container)
{
  return chain(t).any(container);
}

template <typename T,
          typename Iterator>
chain when_any(T& t, Iterator begin, Iterator end)
{
  return chain(t).any(begin, end);
}

Here are some basic to advance examples using the above code with two threads. My notation:

  • a -> b expresses a then b
  • (a | b) expresses a or b. Thus (a | b) -> c implies when either a or b finish, then run c.
  • (a & b) expresses a and b. Thus (a & b) -> c implies when both a and b finish, then run c.

Before each case, I print the chain's notation. Additionally, each function will print a capital letter when entering, and a lower letter when exiting.

#include <iostream>
#include <boost/asio.hpp>
#include <boost/assign.hpp>
#include <boost/thread.hpp>
#include "async_ops.hpp"

/// @brief Print identifiers when entering and exiting scope,
///        sleeping between.
void print_and_sleep(char id, unsigned int sleep_time)
{
  std::cout << char(toupper(id));
  boost::this_thread::sleep_for(boost::chrono::milliseconds(sleep_time));
  std::cout << char(tolower(id));
  std::cout.flush();
}

/// @brief Convenience function to create functors.
boost::function<void()> make_fn(char id, unsigned int sleep_time)
{
  return boost::bind(&print_and_sleep, id, sleep_time);  
}

/// @brief Run an io_service with multiple threads.
void run_service(boost::asio::io_service& io_service)
{
  boost::thread_group threads;
  threads.create_thread(boost::bind(
    &boost::asio::io_service::run, &io_service));
  io_service.run();
  threads.join_all();
}

int main()
{
  boost::function<void()> a = make_fn('a', 500);
  boost::function<void()> b = make_fn('b', 1000);
  boost::function<void()> c = make_fn('c', 500);
  boost::function<void()> d = make_fn('d', 1000);
  boost::function<void()> e = make_fn('e', 500);

  {
    std::cout << "a -> b -> c\n"
                 "  ";
    boost::asio::io_service io_service;
    async(io_service, a)
      .then(b)
      .then(c);
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a & b) -> c\n"
                 "  ";
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .then(c); 
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a | b) -> c\n"
                 "  ";
    boost::asio::io_service io_service;
    when_any(io_service, boost::assign::list_of(a)(b))
      .then(c); 
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a & b) -> (c & d)\n"
                 "  ";
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .all(boost::assign::list_of(c)(d));
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a & b) -> c -> (d & e)\n"
                 "  ";
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .then(c)
      .all(boost::assign::list_of(d)(e));
    run_service(io_service);
    std::cout << std::endl;
  }

  std::cout << "(a & b) -> (c & d) -> e\n"
               "  ";
  {
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .all(boost::assign::list_of(c)(d))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }

  std::cout << "(a | b) -> (c | d) -> e\n"
               "  ";
  {
    boost::asio::io_service io_service;
    when_any(io_service, boost::assign::list_of(a)(b))
      .any(boost::assign::list_of(c)(d))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }

  std::cout << "(a | b) -> (c & d) -> e\n"
               "  ";
  {
    boost::asio::io_service io_service;
    when_any(io_service, boost::assign::list_of(a)(b))
      .all(boost::assign::list_of(c)(d))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "a -> ((b -> d) | c) -> e\n"
                 "  ";
    boost::asio::io_service io_service;
    async(io_service, a)
      .any(boost::assign::list_of
           (async(io_service, b).then(d))
           (async(inherit_service, c)))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }
}

Produces the following output:

a -> b -> c
  AaBbCc
(a & b) -> c
  ABabCc
(a | b) -> c
  ABaCbc
(a & b) -> (c & d)
  ABabCDcd
(a & b) -> c -> (d & e)
  ABabCcDEed
(a & b) -> (c & d) -> e
  ABabCDcdEe
(a | b) -> (c | d) -> e
  ABaCbDcEed
(a | b) -> (c & d) -> e
  ABaCbDcdEe
a -> ((b -> d) | c) -> e
  AaBCcEbDed
like image 104
Tanner Sansbury Avatar answered Sep 25 '22 00:09

Tanner Sansbury