I find myself writing code that basically looks like this:
using boost::system::error_code;
socket.async_connect(endpoint, [&](error_code Error)
{
if (Error)
{
print_error(Error);
return;
}
// Read header
socket.async_read(socket, somebuffer, [&](error_code Error, std::size_t N)
{
if (Error)
{
print_error(Error);
return;
}
// Read actual data
socket.async_read(socket, somebuffer, [&](error_code Error, std::size_t N)
{
// Same here...
});
});
};
So basically I'm nesting callbacks in callbacks in callbacks, while the logic is simple and "linear".
Is there a more elegant way of writing this, so that the code is both local and in-order?
One elegant solution is to use coroutines. Boost.Asio supports both stackless coroutines, which introduce a small set of pseudo-keywords, and stackful coroutines, which use Boost.Coroutine.
Stackless coroutines introduce a set of pseudo-keywords preprocessor macros, that implement a switch statement using a technique similar to Duff's Device. The documentation covers each of the keywords in detail.
The original problem (connect->read header->read body) might look something like the following when implemented with stackless coroutines:
struct session
: boost::asio::coroutine
{
boost::asio::ip::tcp::socket socket_;
std::vector<char> buffer_;
// ...
void operator()(boost::system::error_code ec = boost::system::error_code(),
std::size_t length = 0)
{
// In this example we keep the error handling code in one place by
// hoisting it outside the coroutine. An alternative approach would be to
// check the value of ec after each yield for an asynchronous operation.
if (ec)
{
print_error(ec);
return;
}
// On reentering a coroutine, control jumps to the location of the last
// yield or fork. The argument to the "reenter" pseudo-keyword can be a
// pointer or reference to an object of type coroutine.
reenter (this)
{
// Asynchronously connect. When control resumes at the following line,
// the error and length parameters reflect the result of
// the asynchronous operation.
yield socket_.async_connect(endpoint_, *this);
// Loop until an error or shutdown occurs.
while (!shutdown_)
{
// Read header data. When control resumes at the following line,
// the error and length parameters reflect the result of
// the asynchronous operation.
buffer_.resize(fixed_header_size);
yield socket_.async_read(boost::asio::buffer(buffer_), *this);
// Received data. Extract the size of the body from the header.
std::size_t body_size = parse_header(buffer_, length);
// If there is no body size, then leave coroutine, as an invalid
// header was received.
if (!body_size) return;
// Read body data. When control resumes at the following line,
// the error and length parameters reflect the result of
// the asynchronous operation.
buffer_.resize(body_size);
yield socket_.async_read(boost::asio::buffer(buffer_), *this);
// Invoke the user callback to handle the body.
body_handler_(buffer_, length);
}
// Initiate graceful connection closure.
socket_.shutdown(tcp::socket::shutdown_both, ec);
} // end reenter
}
}
Stackful coroutines are created using the spawn()
function. The original problem may look something like the following when implemented with stackful coroutines:
boost::asio::spawn(io_service, [&](boost::asio::yield_context yield)
{
boost::system::error_code ec;
boost::asio::ip::tcp::socket socket(io_service);
// Asynchronously connect and suspend the coroutine. The coroutine will
// be resumed automatically when the operation completes.
socket.async_connect(endpoint, yield[ec]);
if (ec)
{
print_error(ec);
return;
}
// Loop until an error or shutdown occurs.
std::vector<char> buffer;
while (!shutdown)
{
// Read header data.
buffer.resize(fixed_header_size);
std::size_t bytes_transferred = socket.async_read(
boost::asio::buffer(buffer), yield[ec]);
if (ec)
{
print_error(ec);
return;
}
// Extract the size of the body from the header.
std::size_t body_size = parse_header(buffer, bytes_transferred);
// If there is no body size, then leave coroutine, as an invalid header
// was received.
if (!body_size) return;
// Read body data.
buffer.resize(body_size);
bytes_transferred =
socket.async_read(boost::asio::buffer(buffer), yield[ec]);
if (ec)
{
print_error(ec);
return;
}
// Invoke the user callback to handle the body.
body_handler_(buffer, length);
}
// Initiate graceful connection closure.
socket.shutdown(tcp::socket::shutdown_both, ec);
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With