Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using an analog of boost::asio::awaitable in boost::cobalt

SocketWrapper class has asyncRead that reads either from boost::asio::ssl::stream<boost::asio::ip::tcp::socket> or from boost::asio::ip::tcp::socket depending on isSSL() flag:

#pragma once

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/cobalt.hpp>

class SocketWrapper
{
private:

    using SslStream = boost::asio::ssl::stream<boost::asio::ip::tcp::socket>;

public:

    template <size_t read_length>
    boost::cobalt::promise<size_t> asyncRead(uint8_t (&data)[read_length])
    {
        namespace asio = boost::asio;
        namespace cobalt = boost::cobalt;

        auto buf = asio::buffer(data, read_length);

        if (isSSL()) {
            co_return co_await stream().async_read_some(buf, cobalt::use_op);
        }
        else {
            co_return co_await underlyingSocket().async_read_some(buf, cobalt::use_op);
        }
    }

private:

    bool isSSL() const;

    const boost::asio::ip::tcp::socket& underlyingSocket() const;
    boost::asio::ip::tcp::socket& underlyingSocket();

    const SslStream& stream() const;
    SslStream& stream();
};

asyncRead is called from a function like this:

cobalt::promise<void> someRead(SocketWrapper& from)
{
    try
    {
        uint8_t buf[8192];
        while (true)
        {
            std::size_t n = co_await from.asyncRead(buf);

            // other async operations with co_await
        }
    }
    catch (std::exception& e)
    {
        std::cerr << "handle_client exception: " << e.what() << "\n";
    }
}

It is it possible to change the implementation asyncRead so that it will use return instead of co_return co_await and look like this:

template <size_t read_length>
some_awaitable_type<size_t> asyncRead(uint8_t (&data)[read_length])
{
    namespace asio = boost::asio;
    namespace cobalt = boost::cobalt;

    auto buf = asio::buffer(data, read_length);

    if (isSSL()) {
        return stream().async_read_some(buf, cobalt::use_op);
    }
    else {
        return underlyingSocket().async_read_some(buf, cobalt::use_op);
    }
}

So that someRead is still compiled?

For example, I was able to to this with boost::asio::awaitable, but not with boost::cobalt:

template <size_t read_length>
boost::asio::awaitable<size_t> asyncRead(uint8_t (&data)[read_length])
{
    namespace asio = boost::asio;

    auto buf = asio::buffer(data, read_length);

    if (isSSL()) {
        return stream().async_read_some(buf, boost::asio::use_awaitable);
    }
    else {
        return underlyingSocket().async_read_some(buf, boost::asio::use_awaitable);
    }
}    

And the code below:

template <size_t read_length>
boost::cobalt::promise<size_t> asyncRead(uint8_t (&data)[read_length])
{
    namespace asio = boost::asio;
    namespace cobalt = boost::cobalt;

    auto buf = asio::buffer(data, read_length);

    if (isSSL()) {
        return stream().async_read_some(buf, cobalt::use_op);
    }
    else {
        return underlyingSocket().async_read_some(buf, cobalt::use_op);
    }
}

Fails to compile with GCC with the following errors:

error: could not convert ‘boost::asio::ssl::stream::async_read_some(const MutableBufferSequence&, ReadToken&&) [with MutableBufferSequence = boost::asio::mutable_buffer; ReadToken = const boost::cobalt::use_op_t&; Stream = boost::asio::basic_stream_socketboost::asio::ip::tcp; decltype (async_initiate<ReadToken, void(boost::system::error_code, std::size_t)>(declval<initiate_async_read_some>(), token, buffers)) = boost::asio::async_result<boost::cobalt::use_op_t, void(boost::system::error_code, long unsigned int)>::op_impl<boost::asio::ssl::stream<boost::asio::basic_stream_socketboost::asio::ip::tcp ::initiate_async_read_some, boost::asio::mutable_buffer>; std::size_t = long unsigned int](buf, boost::cobalt::use_op)’ from ‘boost::asio::async_result<boost::cobalt::use_op_t, void(boost::system::error_code, long unsigned int)>::op_impl<boost::asio::ssl::stream<boost::asio::basic_stream_socketboost::asio::ip::tcp ::initiate_async_read_some, boost::asio::mutable_buffer>’ to ‘boost::cobalt::promise’

I can't say for sure what exactly I want to achieve, maybe I want the AsyncRead method to be non-routine and return some light-weight awaitable object, but not a task, or maybe I want to learn more about boost::cobalt.

like image 742
Dmitriano Avatar asked Nov 01 '25 02:11

Dmitriano


1 Answers

For example, I was able to to this with boost::asio::awaitable, but not with boost::cobalt

That's exactly the answer. Of course, as long as you insist on cobalt::promise you need to wrap it. There's not going to be a notable difference.

From the vague reference to "lightweight" I infer that you might be "afraid" of the extra cost of the await.

In practice, I'm assuming that cobalt::use_op uses the same optimizations available for asio::deferred (no extra coro frames required). The key point is that the coroutine frame implements promise_type::await_transform for the completion token specifically instead of creating a new coroutine (as would happen with asio::use_awaitable).

If you DO find in practice that you can measure a difference (highly unlikely unless you make a synthetic benchmark without actual IO objects involved), you might try to come from the reverse angle, and use Asio's promise implementation instead:

template <size_t read_length> cobalt::promise<size_t> asyncRead(uint8_t (&data)[read_length]) {
    auto buf = asio::buffer(data, read_length);

    auto op = isSSL() ? stream().async_read_some(buf, asio::experimental::use_promise)
                      : underlyingSocket().async_read_some(buf, asio::experimental::use_promise);

    co_return co_await std::move(op)(asio::deferred);
}

Note though, that all promise implementations typically use refcounted dynamic allocation, so wrapped promises are likely LESS optimal than the direct approach of suspending the coroutine directly using the use_op transform.

Self contained blarb:

Live On Compiler Explorer

#include <boost/asio.hpp>
#include <boost/asio/experimental/promise.hpp>
#include <boost/asio/experimental/use_promise.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast.hpp>
#include <boost/cobalt.hpp>
#include <iostream>
namespace asio   = boost::asio;
namespace cobalt = boost::cobalt;
using asio::ip::tcp;

class SocketWrapper {
  private:
    using SslStream = boost::asio::ssl::stream<tcp::socket>;

  private:
    bool isSSL() const { return false; }

    auto& underlyingSocket(this auto& This) {
        return boost::beast::get_lowest_layer(This.stream());
    }

    SslStream& stream() {
        using asio::ssl::context;
        static context   ctx(context::method::tlsv13);
        static SslStream s(asio::system_executor(), ctx);
        return s;
    };
    SslStream const& stream() const { return const_cast<SocketWrapper*>(this)->stream(); };

  public:
    template <size_t read_length> cobalt::promise<size_t> asyncRead(uint8_t (&data)[read_length]) {
        auto buf = asio::buffer(data, read_length);

        auto op = isSSL() ? stream().async_read_some(buf, asio::experimental::use_promise)
                          : underlyingSocket().async_read_some(buf, asio::experimental::use_promise);

        co_return co_await std::move(op)(asio::deferred);
    }
};

// asyncRead is called from a function like this:

cobalt::promise<void> someRead(SocketWrapper& from) {
    try {
        uint8_t buf[8192];
        while (true) {
            [[maybe_unused]] size_t n = co_await from.asyncRead(buf);

            // other async operations with co_await
        }
    } catch (std::exception& e) {
        std::cerr << "handle_client exception: " << e.what() << "\n";
    }
}

cobalt::main co_main(int, char*[]) { co_return 0; }
like image 62
sehe Avatar answered Nov 02 '25 16:11

sehe