Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Boost beast::websocket callback functions

I am experimenting with the Boost beast::websocket websocket_client_async.cpp example, in conjunction with websocket_server_async.cpp.

As given, the client example just makes a connection, sends a string to the server (which simply echoes back), prints the reply, closes, and exists.

I'm trying to modify the client to keep the session alive, so that I can repeatedly send/receive strings. So, whereas the example code's on_handshake function immediately sends the string via ws_.async_write(...), I separate that out into its own write(...) function.

Here is my modified session class:

using tcp = boost::asio::ip::tcp;
namespace websocket = boost::beast::websocket;

void fail(boost::system::error_code ec, char const* what)
{
    std::cerr << what << ": " << ec.message() << "\n";
}

// Sends a WebSocket message and prints the response
class session : public std::enable_shared_from_this<session>
{
    tcp::resolver resolver_;
    websocket::stream<tcp::socket> ws_;
    std::atomic<bool> io_in_progress_;
    boost::beast::multi_buffer buffer_;
    std::string host_;

public:
    // Resolver and socket require an io_context
    explicit session(boost::asio::io_context& ioc) : resolver_(ioc), ws_(ioc) {
        io_in_progress_ = false;
    }

    bool io_in_progress() const {
        return io_in_progress_;
    }

    // +---------------------+
    // | The "open" sequence |
    // +---------------------+
    void open(char const* host, char const* port)
    {
        host_ = host;

        // Look up the domain name
        resolver_.async_resolve(host, port,
            std::bind( &session::on_resolve, shared_from_this(),
                std::placeholders::_1, std::placeholders::_2 )
        );
    }

    void on_resolve(boost::system::error_code ec, tcp::resolver::results_type results)
    {
        if (ec)
            return fail(ec, "resolve");

        boost::asio::async_connect(
            ws_.next_layer(), results.begin(), results.end(),
            std::bind( &session::on_connect, shared_from_this(),
                std::placeholders::_1 )
        );
    }

    void on_connect(boost::system::error_code ec)
    {
        if (ec)
            return fail(ec, "connect");

        ws_.async_handshake(host_, "/",
            std::bind( &session::on_handshake, shared_from_this(),
                std::placeholders::_1 )
        );
    }

    void on_handshake(boost::system::error_code ec)
    {
        if (ec)
            return fail(ec, "handshake");
        else {
            std::cout << "Successful handshake with server.\n";
        }
    }

    // +---------------------------+
    // | The "write/read" sequence |
    // +---------------------------+
    void write(const std::string &text)
    {
        io_in_progress_ = true;
        ws_.async_write(boost::asio::buffer(text),
            std::bind( &session::on_write, shared_from_this(),
                std::placeholders::_1, std::placeholders::_2 )
        );
    }

    void on_write(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);
        if (ec)
            return fail(ec, "write");

        ws_.async_read(buffer_,
            std::bind( &session::on_read, shared_from_this(),
                std::placeholders::_1, std::placeholders::_2 )
        );
    }

    void on_read(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        io_in_progress_ = false; // end of write/read sequence
        boost::ignore_unused(bytes_transferred);
        if (ec)
            return fail(ec, "read");

        std::cout << boost::beast::buffers(buffer_.data()) << std::endl;
    }

    // +----------------------+
    // | The "close" sequence |
    // +----------------------+
    void close()
    {
        io_in_progress_ = true;
        ws_.async_close(websocket::close_code::normal,
            std::bind( &session::on_close, shared_from_this(),
                std::placeholders::_1)
        );
    }

    void on_close(boost::system::error_code ec)
    {
        io_in_progress_ = false; // end of close sequence
        if (ec)
            return fail(ec, "close");

        std::cout << "Socket closed successfully.\n";
    }
};

The problem is that, while the connection works fine and I can send a string, the on_read callback is never hit (unless I do an ugly hack described below).

My main looks like this:

void wait_for_io(std::shared_ptr<session> psession, boost::asio::io_context &ioc)
{
    // Continually try to run the ioc until the callbacks are finally
    // triggered (as indicated by the session::io_in_progress_ flag)
    while (psession->io_in_progress()) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
        ioc.run();
    }
}

int main(int argc, char** argv)
{
    // Check command line arguments.
    if (argc != 3) {
        std::cerr << "usage info goes here...\n";
        return EXIT_FAILURE;
    }
    const char *host = argv[1], *port = argv[2];

    boost::asio::io_context ioc;
    std::shared_ptr<session> p = std::make_shared<session>(ioc);
    p->open(host, port);
    ioc.run(); // This works. Connection is established and all callbacks are executed.

    p->write("Hello world"); // String is sent & received by server,
                             // even before calling ioc.run()
                             // However, session::on_read callback is never
                             // reached.

    ioc.run();               // This seems to be ignored and returns immediately, so
    wait_for_io(p, ioc);     // <-- so this hack is necessary

    p->close();              // session::on_close is never reached
    ioc.run();               // Again, this seems to be ignored and returns immediately, so
    wait_for_io(p, ioc);     // <-- this is necessary

    return EXIT_SUCCESS;
}

If I do this:

p->write("Hello world");
while(1) {
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

I can confirm that the string is sent & received1 by the server, and that the session::on_read callback is not reached.

Same thing occurs with p->close().

But, if I add my weird wait_for_io() function, everything works. I'm positive this a terrible hack, but I can't figure out what's going on.

1 Note: I can confirm that the message does reach the server, as I modified the server example to print any received strings to the console. This was the only modified I made. The echo-to-client functionality was not changed.

like image 213
Blair Fonville Avatar asked Apr 25 '18 20:04

Blair Fonville


2 Answers

io_context::run will only return when there is no more pending work. If you simply make sure that there is a pending call to websocket::stream::async_read active at all times, then run will never return and the hacks will not be needed. Also, you will receive all messages sent by the server.

like image 137
Vinnie Falco Avatar answered Nov 03 '22 11:11

Vinnie Falco


The reason the calls to io_context::run() are not working after the first call (shown here):

boost::asio::io_context ioc;
std::shared_ptr<session> p = std::make_shared<session>(ioc);
p->open(host, port);
ioc.run(); // This works. Connection is established and all callbacks are executed.

is because the function io_context::restart() must be called prior to any subsequent calls of io_context::run.

From the documentation:

io_context::restart

Restart the io_context in preparation for a subsequent run() invocation.

This function must be called prior to any second or later set of invocations of the run(), run_one(), poll() or poll_one() functions when a previous invocation of these functions returned due to the io_context being stopped or running out of work. After a call to restart(), the io_context object's stopped() function will return false.

like image 3
Blair Fonville Avatar answered Nov 03 '22 11:11

Blair Fonville