Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

asio::async_write incredibly difficult to synchronize on a high volume stream

I am currently using the Asio C++ library and wrote a client wrapper around it. My original approach was very basic and only needed to stream in a single direction. Requirements have changed and I've switched over to using all asynchronous calls. Most of the migration has been easy except for the asio::async_write(...). I have used a few different approaches and inevitably run into a deadlock with each one.

The application streams data at a high volume continuously. I have stayed away from strands because they do not block and can lead to memory issues especially when the server is under heavy load. Jobs will back up and the applications heap indefinitely grows.

So I created a blocking queue only to find out the hard way that using locks across callbacks and or blocking events leads to unknown behavior.

The wrapper is a very large class, so I will try to explain my landscape in its current state and hopefully get some good suggestions:

  • I have an asio::steady_timer that runs on a fixed schedule to push a heartbeat message directly into the blocking queue.
  • A thread dedicated to reading events and pushing them to the blocking queue
  • A thread dedicated to consumption of the blocking queue

For example, in my queue I have a queue::block() and queue::unblock() that are just wrappers for the condition variable / mutex.

std::thread consumer([this]() {
    std::string message_buffer;

    while (queue.pop(message_buffer)) {
        queue.stage_block();
        asio::async_write(*socket, asio::buffer(message_buffer), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
        queue.block();

    }
});

void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
    queue.unblock();
}

When the socket backs up and the server can no longer accept data because of the current load, the queue fills up and leads to a deadlock where handle_write(...) is never called.

The other approach eliminates the consumer thread entirely and relies on handle_write(...) to pop the queue. Like so:

void networking::write(const std::string& data) {
    if (!queue.closed()) {
        std::stringstream stream_buffer;
        stream_buffer << data << std::endl;

        spdlog::get("console")->debug("pushing to queue {}", queue.size());

        queue.push(stream_buffer.str());

        if (queue.size() == 1) {
            spdlog::get("console")->debug("handle_write: {}", stream_buffer.str());
            asio::async_write(*socket, asio::buffer(stream_buffer.str()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
        }
    }
}

void networking::handle_write(const std::error_code& error, size_t bytes_transferred) {
    std::string message;
    queue.pop(message);

    if (!queue.closed() && !queue.empty()) {
        std::string front = queue.front();

        asio::async_write(*socket, asio::buffer(queue.front()), std::bind(&networking::handle_write, this, std::placeholders::_1, std::placeholders::_2));
    }
}

This also resulted in a deadlock and obviously results in other race problems. When I disabled my heartbeat callback, I had absolutely no issues. However, the heartbeat is a requirement.

What am I doing wrong? What is a better approach?

like image 527
user0000001 Avatar asked Nov 17 '22 08:11

user0000001


1 Answers

It appears all my pain derived from the heartbeat entirely. Disabling the heartbeat in each variation of my asynchronous write operations seem to cure my problems, so this lead me to believe that this could be a result of using the built in asio::async_wait(...) and the asio::steady_timer.

Asio synchronizes its work internally and waits for jobs to complete before executing the next job. Using the asio::async_wait(...) to construct my heartbeat functionality was my design flaw because it operated on the same thread that waited on pending jobs. It created a deadlock with Asio when the heartbeat waited on queue::push(...). This would explain why asio::async_write(...) completion handler never executed in my first example.

The solution was to put the heartbeat on its own thread and let it work independently from Asio. I am still using my blocking queue to synchronize calls to asio::async_write(...) but have modified my consumer thread to use std::future and std::promise. This synchronizes the callback with my consumer thread cleanly.

std::thread networking::heartbeat_worker() {
    return std::thread([&]() {
        while (socket_opened) {

            spdlog::get("console")->trace("heartbeat pending");
            write(heartbeat_message);
            spdlog::get("console")->trace("heartbeat sent");

            std::unique_lock<std::mutex> lock(mutex);
            socket_closed_event.wait_for(lock, std::chrono::milliseconds(heartbeat_interval), [&]() {
                return !socket_opened;
            });
        }

        spdlog::get("console")->trace("heartbeat thread exited gracefully");
    });
}
like image 51
user0000001 Avatar answered Dec 05 '22 09:12

user0000001