Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using boost::asio stackless coroutines to download several files via HTTP

I translated the example from Programming in Lua by Roberto Ierusalimschy for downloading several files via HTTP using coroutines to C++ using boost::asio and stackful coroutines. Here is the code:

#include <iostream>
#include <chrono>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>

using namespace std;
using namespace boost::asio;

io_service ioService;

void download(const string& host, const string& file, yield_context& yield)
{
  clog << "Downloading " << host << file << " ..." << endl;

  size_t fileSize = 0;
  boost::system::error_code ec;

  ip::tcp::resolver resolver(ioService);

  ip::tcp::resolver::query query(host, "80");
  auto it = resolver.async_resolve(query, yield[ec]);

  ip::tcp::socket socket(ioService);
  socket.async_connect(*it, yield[ec]);

  ostringstream req;
  req << "GET " << file << " HTTP/1.0\r\n\r\n";
  write(socket, buffer(req.str()));

  while (true)
  {
    char data[8192];
    size_t bytesRead = socket.async_read_some(buffer(data), yield[ec]);
    if (0 == bytesRead) break;
    fileSize += bytesRead;
  }

  socket.shutdown(ip::tcp::socket::shutdown_both);
  socket.close();

  clog << file << " size: " << fileSize << endl;
}

int main()
{
  auto timeBegin = chrono::high_resolution_clock::now();

  vector<pair<string, string>> resources =
  {
    {"www.w3.org", "/TR/html401/html40.txt"},
    {"www.w3.org", "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf"},
    {"www.w3.org", "/TR/REC-html32.html"},
    {"www.w3.org", "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt"},
  };

  for(const auto& res : resources)
  {
    spawn(ioService, [&res](yield_context yield)
    {
      download(res.first, res.second, yield);
    });
  }

  ioService.run();

  auto timeEnd = chrono::high_resolution_clock::now();

  clog << "Time: " << chrono::duration_cast<chrono::milliseconds>(
            timeEnd - timeBegin).count() << endl;

  return 0;
}

Now I'm trying to translate the code to use stackless coroutines from boost::asio but the documentation is not enough for me to grok how to organize the code in such way to be able to do it. Can someone provide solution for this?

like image 443
bobeff Avatar asked Aug 15 '16 15:08

bobeff


1 Answers

Here is a solution based on stackless coroutines as provided by Boost. Given that they are essentially a hack, I would not consider the solution particularly elegant. It could probably be done better with C++20, but I think that would be outside the scope of this question.

#include <functional>
#include <iostream>

#include <boost/asio.hpp>
#include <boost/asio/yield.hpp>

using boost::asio::async_write;
using boost::asio::buffer;
using boost::asio::error::eof;
using boost::system::error_code;

using std::placeholders::_1;
using std::placeholders::_2;

/**
 * Stackless coroutine for downloading file from host.
 *
 * The lifetime of the object is limited to one () call. After that,
 * the object will be copied and the old object is discarded. For this
 * reason, the socket_ and resolver_ member are stored as shared_ptrs,
 * so that they can live as long as there is a live copy. An alternative
 * solution would be to manager these objects outside of the coroutine
 * and to pass them here by reference.
 */
class downloader : boost::asio::coroutine {

  using socket_t = boost::asio::ip::tcp::socket;
  using resolver_t = boost::asio::ip::tcp::resolver;

public:
  downloader(boost::asio::io_service &service, const std::string &host,
             const std::string &file)
      : socket_{std::make_shared<socket_t>(service)},
        resolver_{std::make_shared<resolver_t>(service)}, file_{file},
        host_{host} {}

  void operator()(error_code ec = error_code(), std::size_t length = 0,
                  const resolver_t::results_type &results = {}) {

    // Check if the last yield resulted in an error.
    if (ec) {
      if (ec != eof) {
        throw boost::system::system_error{ec};
      }
    }

    // Jump to after the previous yield.
    reenter(this) {

      yield {
        resolver_t::query query{host_, "80"};

        // Use bind to skip the length parameter not provided by async_resolve
        auto result_func = std::bind(&downloader::operator(), this, _1, 0, _2);

        resolver_->async_resolve(query, result_func);
      }

      yield socket_->async_connect(*results, *this);

      yield {
        std::ostringstream req;
        req << "GET " << file_ << " HTTP/1.0\r\n\r\n";
        async_write(*socket_, buffer(req.str()), *this);
      }

      while (true) {
        yield {
          char data[8192];
          socket_->async_read_some(buffer(data), *this);
        }

        if (length == 0) {
          break;
        }

        fileSize_ += length;
      }

      std::cout << file_ << " size: " << fileSize_ << std::endl;

      socket_->shutdown(socket_t::shutdown_both);
      socket_->close();
    }

    // Uncomment this to show progress and to demonstrace interleaving
    // std::cout << file_ << " size: " << fileSize_ << std::endl;
  }

private:
  std::shared_ptr<socket_t> socket_;
  std::shared_ptr<resolver_t> resolver_;

  const std::string file_;
  const std::string host_;
  size_t fileSize_{};
};

int main() {
  auto timeBegin = std::chrono::high_resolution_clock::now();

  try {
    boost::asio::io_service service;

    std::vector<std::pair<std::string, std::string>> resources = {
        {"www.w3.org", "/TR/html401/html40.txt"},
        {"www.w3.org", "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf"},
        {"www.w3.org", "/TR/REC-html32.html"},
        {"www.w3.org", "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt"},
    };

    std::vector<downloader> downloaders{};
    std::transform(resources.begin(), resources.end(),
                   std::back_inserter(downloaders), [&](auto &x) {
                     return downloader{service, x.first, x.second};
                   });

    std::for_each(downloaders.begin(), downloaders.end(),
                  [](auto &dl) { dl(); });

    service.run();

  } catch (std::exception &e) {
    std::cerr << "exception: " << e.what() << "\n";
  }

  auto timeEnd = std::chrono::high_resolution_clock::now();

  std::cout << "Time: "
            << std::chrono::duration_cast<std::chrono::milliseconds>(timeEnd -
                                                                     timeBegin)
                   .count()
            << std::endl;

  return 0;
}

Compiled with Boost 1.72 and g++ -lboost_coroutine -lpthread test.cpp. Example output:

$ ./a.out 
/TR/REC-html32.html size: 606
/TR/html401/html40.txt size: 629
/TR/2002/REC-xhtml1-20020801/xhtml1.pdf size: 115777
/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt size: 229699
Time: 1644

The log line at the end of the () function can be uncommented to demonstrate the interleaving of the downloads.

like image 70
f9c69e9781fa194211448473495534 Avatar answered Sep 25 '22 23:09

f9c69e9781fa194211448473495534