Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Correct way to read & write data to boost::beast::websocket::stream

When we write data to plain socket or ssl::stream, it is recommended to use message queue to send data. In this case, I store messages in queue of type std::queue<std::string> and generally use following implementation:

template<typename socket_t>
class io : public std::enable_shared_from_this<io<socket_t>>
{
public:
    io(socket_t socket) : m_socket(std::move(socket)) {}

    void start()
    {
        read();
    }

    void write(std::string data)
    {
        bool write_in_progress = !m_write_queue.empty();
        m_write_queue.push(std::move(data));
        if (!write_in_progress)
        {
            do_write();
        }
    }

    void close()
    {
        m_socket.close();
    }

private:
    socket_t m_socket;
    std::array<char, 4096> m_buffer;
    std::queue<std::string> m_write_queue;

    void read()
    {
        auto self = this->shared_from_this();

        m_socket.async_read_some(boost::asio::buffer(m_buffer),
            [this, self](boost::system::error_code ec, std::size_t bytes_transferred) 
        {
            if (!ec)
            {
                std::string data(m_buffer.data(), bytes_transferred);
                read();
            }
        });
    }

    void do_write()
    {
        auto self = this->shared_from_this();

        boost::asio::async_write(m_socket, boost::asio::buffer(m_write_queue.front().c_str(), m_write_queue.front().size()),
            [this, self](boost::system::error_code ec, std::size_t /*length*/)
        {
            if (!ec)
            {
                m_write_queue.pop();
                if (!m_write_queue.empty())
                {
                    do_write();
                }
            }
        });
    }
};

My question is, how to write data to boost::beast::websocket::stream asynchronously? Should I create a queue where each element is boost::beast::flat_buffer or do it some other way?

Ideally I would like to use a std::queue<std::string> queue, but I don't know how exactly to do this.

I will also be glad if someone gives an example of correct reading of data from boost::beast::websocket::stream. I think for reading I need to use boost::beast::flat_buffer. However I don't know if I need to clear this buffer after each read operation if this buffer is a class field?

like image 332
Joe J Avatar asked Feb 26 '26 04:02

Joe J


1 Answers

To answer the last part first: the flat_buffer models the dynamic buffer concept.

You can use its direct interface to prepare(), commit() new content or consume() available data.

That said, here's a demo implementation that shows how to write this for a WS stream instead of a socket:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <deque>
#include <iostream>

namespace asio      = boost::asio;
namespace beast     = boost::beast;
namespace websocket = beast::websocket;
using asio::ip::tcp;

class io : public std::enable_shared_from_this<io> {
  public:
    using error_code = beast::error_code;
    using socket_t   = websocket::stream<tcp::socket>;
    io(socket_t socket) : m_ws(std::move(socket)) {}

    void start() { do_handshake(); }

    void queue_write(std::string data) {
        asio::post(m_ws.get_executor(), //
                   [this, m = std::move(data), self = shared_from_this()]() mutable {
                       m_outgoing.push_back(std::move(m));
                       if (m_outgoing.size() == 1) // if this is the first write, start the write loop
                           do_write_loop();
                   });
    }

    // void close() { beast::beast_close_socket(beast::get_lowest_layer(m_socket)); }
    void stop() {
        asio::post(m_ws.get_executor(), [this, self = shared_from_this()]() { //
            m_ws.async_close(websocket::close_code::going_away, asio::detached);
        });
    }

  private:
    socket_t                m_ws;
    std::string             m_incoming;
    std::deque<std::string> m_outgoing;

    // beast weirdly requires dynamic buffers to be lvalues; asio does not 
    asio::dynamic_string_buffer<char, std::char_traits<char>, std::allocator<char>> m_incoming_buffer =
        asio::dynamic_buffer(m_incoming);

    // all private methods are called from the strand
    void do_handshake() {
        m_ws.async_accept([this, self= shared_from_this()](error_code ec) {
            if (ec) {
                std::cerr << "Handshake failed: " << ec.message() << std::endl;
                return;
            }

            do_read_loop();  // start reading
            do_write_loop(); // just in case writes are queued before handshake
        });
    }

    void do_read_loop() {
        m_ws.async_read( //
            m_incoming_buffer, [this, self = shared_from_this()](error_code ec, size_t n) {
                if (ec) { // TODO partial reads and eof?
                    std::cerr << "Error reading: " << ec.message() << std::endl;
                    return;
                }

                // examples of processing:
#if 1
                // directly clearing m_incoming
                std::string processing  = std::move(m_incoming);
                std::cout << "Received: " << quoted(processing) << std::endl;
#else
                // or consume (parts of) the buffer
                m_incoming_buffer.consume(n);
#endif

                do_read_loop();
            });
    }

    void do_write_loop() {
        if (m_outgoing.empty())
            return;

        m_ws.async_write( //
            asio::buffer(m_outgoing.front()), [this, self = shared_from_this()](error_code ec, size_t /*n*/) {
                if (ec) {
                    std::cerr << "Error writing: " << ec.message() << std::endl;
                    return;
                }

                m_outgoing.pop_front();
                do_write_loop();
            });
    }
};

using std::this_thread::sleep_for;
using namespace std::chrono_literals;

int main() try {
    asio::thread_pool ioc;

    auto conn = std::make_shared<io>(        //
        websocket::stream<tcp::socket>(      //
            tcp::acceptor(ioc, {{}, 7878})   //
                .accept(make_strand(ioc)))); //

    conn->start();                         

    sleep_for(500ms);

    conn->queue_write("hello\n");
    conn->queue_write("world\n");

    sleep_for(500ms);

    conn->stop();

    ioc.join();
} catch (std::exception const& e) {
    std::cerr << "Exception: " << e.what() << std::endl;
}

With a live demo:

like image 62
sehe Avatar answered Feb 27 '26 20:02

sehe