Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How resume the execution of a stackful coroutine in the context of its strand?

using Yield = asio::yield_context;
using boost::system::error_code;
int Func(Yield yield) {
  error_code ec;
  asio::detail::async_result_init<Yield, void(error_code, int)> init(yield[ec]);
  std::thread th(std::bind(Process, init.handler));
  int result = init.result.get();  // <--- yield at here
  return result;
}

How to implement Process so that Func will resumed in the context of the strand that Func was originally spawned on?

like image 638
updogliu Avatar asked Dec 09 '22 05:12

updogliu


1 Answers

Boost.Asio uses a helper function, asio_handler_invoke, to provide a customization point for invocation strategies. For example, when a Handler has been wrapped by a strand, the invocation strategy will cause the handler to be dispatched through the strand upon invocation. As noted in the documentation, asio_handler_invoke should be invoked via argument-dependent lookup.

using boost::asio::asio_handler_invoke;
asio_handler_invoke(nullary_functor, &handler);

For stackful coroutines, there are various important details to take into consideration when yielding the coroutine and when invoking the handler_type associated with a yield_context to resume the coroutine:

  • If code is currently running in the coroutine, then it is within the strand associated with the coroutine. Essentially, a simple handler is wrapped by the strand that resumes the coroutine, causing execution to jump to the coroutine, blocking the handler currently in the strand. When the coroutine yields, execution jumps back to the strand handler, allowing it to complete.
  • While spawn() adds work to the io_service (a handler that will start and jump to the coroutine), the coroutine itself is not work. To prevent the io_service event loop from ending while a coroutine is outstanding, it may be necessary to add work to the io_service before yielding.
  • Stackful coroutines use a strand to help guarantee the coroutine yields before resume is invoked. Asio 1.10.6 / Boost 1.58 enabled being able to safely invoke the completion handler from within the initiating function. Prior versions required that the completion handler was not invoked from within the initiating function, as its invocation strategy would dispatch(), causing the coroutine to attempt resumption before being suspended.

Here is a complete example that accounts for these details:

#include <iostream>    // std::cout, std::endl
#include <chrono>      // std::chrono::seconds
#include <functional>  // std::bind
#include <thread>      // std::thread
#include <utility>     // std::forward
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>

template <typename CompletionToken, typename Signature>
using handler_type_t = typename boost::asio::handler_type<
  CompletionToken, Signature>::type;

template <typename Handler>
using async_result = boost::asio::async_result<Handler>;

/// @brief Helper type used to initialize the asnyc_result with the handler.
template <typename CompletionToken, typename Signature>
struct async_completion
{
  typedef handler_type_t<CompletionToken, Signature> handler_type;

  async_completion(CompletionToken&& token)
    : handler(std::forward<CompletionToken>(token)),
      result(handler)
  {}

  handler_type handler;
  async_result<handler_type> result;
};

template <typename Signature, typename CompletionToken>
typename async_result<
  handler_type_t<CompletionToken, Signature>
>::type
async_func(CompletionToken&& token, boost::asio::io_service& io_service)
{
  // The coroutine itself is not work, so guarantee the io_service has
  // work.
  boost::asio::io_service::work work(io_service);

  // Initialize the async completion handler and result.
  async_completion<CompletionToken, Signature> completion(
      std::forward<CompletionToken>(token));

  auto handler = completion.handler;
  std::cout << "Spawning thread" << std::endl;
  std::thread([](decltype(handler) handler)
    {
      // The handler will be dispatched to the coroutine's strand.
      // As this thread is not running within the strand, the handler
      // will actually be posted, guaranteeing that yield will occur
      // before the resume.
      std::cout << "Resume coroutine" << std::endl;
      using boost::asio::asio_handler_invoke;
      asio_handler_invoke(std::bind(handler, 42), &handler);
    }, handler).detach();

  // Demonstrate that the handler is serialized through the strand by
  // allowing the thread to run before suspending this coroutine.
  std::this_thread::sleep_for(std::chrono::seconds(2));

  // Yield the coroutine.  When this yields, execution transfers back to
  // a handler that is currently in the strand.  The handler will complete
  // allowing other handlers that have been posted to the strand to run.
  std::cout << "Suspend coroutine" << std::endl;
  return completion.result.get();
}

int main()
{
  boost::asio::io_service io_service;

  boost::asio::spawn(io_service,
    [&io_service](boost::asio::yield_context yield)
    {
      auto result = async_func<void(int)>(yield, io_service);
      std::cout << "Got: " << result << std::endl;
    });

  std::cout << "Running" << std::endl;
  io_service.run();
  std::cout << "Finish" << std::endl;
}

Output:

Running
Spawning thread
Resume coroutine
Suspend coroutine
Got: 42
Finish

For much more details, please consider reading Library Foundations for Asynchronous Operations. It provides much greater detail into the composition of asynchronous operations, how Signature affects async_result, and the overall design of async_result, handler_type, and async_completion.

like image 108
Tanner Sansbury Avatar answered Dec 22 '22 01:12

Tanner Sansbury