Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AsyncOperation Reading and Writing

There is a test code for main, It's pretty simple.

void AddData(std::shared_ptr<std::queue<std::string>> stores)
{
    stores->push("A\n");
    stores->push("B\n");
    stores->push("C\n");
    stores->push("D\n");
    stores->push("E\n");
}

    
int main()
{
    std::shared_ptr<std::queue<std::string>> stores = std::make_shared<std::queue<std::string>>();
    AddData(stores);

    boost::asio::io_context io_context;
    ServerTools::Server srv(io_context, 15004);

    std::shared_ptr<ServerTools::session> sess = srv.accept_one();
    sess->SetStoreForServer(stores);
    sess->StartAsyncRead();
    sess->StartAsyncSend();

    io_context.run();
}

There is a function for asynchronous reading and asynchronous writing.

void ServerTools::session::StartAsyncSend()
{
    if (!p_messageStore->empty())
    {
        std::string data = p_messageStore->front();

        boost::asio::async_write(socket, boost::asio::buffer(data),         
        [this](boost::system::error_code ec, std::size_t bytes_transferred)
        {
            if (!ec) 
            {
                std::cout <<"Sending " << bytes_transferred << " bytes" << '\n';
                this->p_messageStore->pop();
            }
            else
            {
                std::cerr << "Mistake answer: " << ec.message() << std::endl;
            }

            std::this_thread::sleep_for(std::chrono::seconds(3));
            StartAsyncSend();
        });
    }
}

void ServerTools::session::StartAsyncRead()
{
    std::vector<char> buffer(100);   
    boost::asio::async_read(socket, boost::asio::buffer(buffer), [this](const boost::system::error_code& ec, std::size_t size_transferred)
    {
        if (!ec)
        {
            std::cout << "Received " << size_transferred << "bytes" << std::endl;
        }
        StartAsyncRead();
    });
}

If you run asynchronous reading only, then messages are read without problems and without delays, if you run asynchronous writing, then there are no problems either. But if you run both, then the socket write operation will be involved most of the time. And reading messages will occur much less frequently. It is clear that the code is usually organized differently in applications, but what is the reason for this behavior of the program? If there are ways to fix it, or is the only thing that can be done correctly to organize the sequence of asynchronous reads and writes?

like image 785
user29207775 Avatar asked Jan 26 '26 21:01

user29207775


1 Answers

That code looks almost normal. Except there's bugs because

  • you copy the outbound data into a local variable.
  • you also use a local variable for the async_read
  • you're using blocking sleep_for inside a completion handler. This is probably what prevents the execution context from making any progress when you are writing messages

Assuming you

  • use a container that has reference stability on the push/pop operations (like std::deque)
  • use a (implicit) strand to ensure all operations on shared data are synchronized (your question code is fine, unless you are hiding other threads being created elsewhere)
  • the socket is any of ASIO's builtin AsyncStream models (e.g. tcp::socket)

this pattern is 100% fine and should work as you expect. The only important things are:

  • prevent race conditions (easy when there's only a single thread)
  • don't have overlapping reads or overlapping writes (a single read and a single write operation can be in flight at any time)

Q. And reading messages will occur much less frequently

That completely and only depends on the traffic patterns of your application

BONUS

I'll try to make a fixed complete demo here for you to check notes.

Live On Coliru

#include <boost/asio.hpp>
#include <deque>
#include <iostream>
namespace asio = boost::asio;

namespace ServerTools {
    using namespace std::chrono_literals;
    using asio::ip::tcp;
    using boost::system::error_code;
    using Message      = std::string;
    using MessageQueue = std::deque<Message>;

    class Session : public std::enable_shared_from_this<Session> {
      public:
        Session(tcp::socket socket) : socket_(std::move(socket)) {}

        void Start(std::shared_ptr<MessageQueue> store) {
            SetStoreForServer(std::move(store));
            StartAsyncRead();
            StartAsyncSend();
        }

      private:
        void StartAsyncSend();
        void StartAsyncRead();
        void SetStoreForServer(std::shared_ptr<MessageQueue> store) { outgoing_ = std::move(store); }

        tcp::socket                   socket_;
        asio::steady_timer            timer_{socket_.get_executor()};
        Message                       incoming_;
        std::shared_ptr<MessageQueue> outgoing_;
    };

    class Server {
      public:
        Server(asio::any_io_executor ex, uint16_t port) : acceptor(ex, {{}, port}) {}

        std::shared_ptr<Session> accept_one() { return std::make_shared<Session>(acceptor.accept()); }

      private:
        tcp::acceptor acceptor;
    };

    void Session::StartAsyncSend() {
        if (!outgoing_->empty()) {
            Message const& data = outgoing_->front(); // no copy

            async_write(
                socket_, asio::buffer(data), [this, self = shared_from_this()](error_code ec, size_t xfr) {
                    if (!ec) {
                        std::cout << "Sending " << xfr << " bytes" << std::endl;
                        outgoing_->pop_front();

                        timer_.expires_after(3s);
                        timer_.async_wait([this, self = shared_from_this()](error_code ec) {
                            if (!ec)
                                StartAsyncSend();
                        });
                    } else {
                        std::cerr << "Mistake answer: " << ec.message() << std::endl;
                    }
                });
        }
    }

    void Session::StartAsyncRead() {
        async_read_until( //
            socket_, asio::dynamic_buffer(incoming_), '\n',
            [this, self = shared_from_this()](error_code ec, size_t xfr) {
                std::cout << "Received " << xfr << " bytes (" << ec.message() << ")" << std::endl;
                if (xfr)
                    std::cout << "Message: " << quoted(std::string_view(incoming_.data(), xfr - 1))
                              << std::endl;

                if (!ec) {
                    incoming_.erase(0, xfr);
                    StartAsyncRead();
                }
            });
    }

    void AddData(std::shared_ptr<MessageQueue> stores) {
        stores->push_back("A\n");
        stores->push_back("B\n");
        stores->push_back("C\n");
        stores->push_back("D\n");
        stores->push_back("E\n");
    }
} // namespace ServerTools

int main() {
    auto store = std::make_shared<ServerTools::MessageQueue>();

    ServerTools::AddData(store);

    asio::io_context    io_context;
    ServerTools::Server srv{io_context.get_executor(), 15004};

    srv.accept_one()->Start(store);

    io_context.run();
}

With a live demo:

like image 70
sehe Avatar answered Jan 28 '26 16:01

sehe



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!