I'm trying to write a wrapper synchronous method around async_read
to allow non blocking reads on a socket. Following several examples around internet I have developed a solution that seems to be almost right but which is not working.
The class declares these relevant attributes and methods:
class communications_client
{
protected:
boost::shared_ptr<boost::asio::io_service> _io_service;
boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
boost::array<boost::uint8_t, 128> _data;
boost::mutex _mutex;
bool _timeout_triggered;
bool _message_received;
boost::system::error_code _error;
size_t _bytes_transferred;
void handle_read(const boost::system::error_code & error, size_t bytes_transferred);
void handle_timeout(const boost::system::error_code & error);
size_t async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error);
...
}
The method async_read_helper
is the one that encapsulates all the complexity, while the other two handle_read
and handle_timeout
are just the event handlers. Here is the implementation of the three methods:
void communications_client::handle_timeout(const boost::system::error_code & error)
{
if (!error)
{
_mutex.lock();
_timeout_triggered = true;
_error.assign(boost::system::errc::timed_out, boost::system::system_category());
_mutex.unlock();
}
}
void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
_mutex.lock();
_message_received = true;
_error = error;
_bytes_transferred = bytes_transferred;
_mutex.unlock();
}
size_t communications_client::async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error)
{
_timeout_triggered = false;
_message_received = false;
boost::asio::deadline_timer timer(*_io_service);
timer.expires_from_now(timeout);
timer.async_wait(
boost::bind(
&communications_client::handle_timeout,
this,
boost::asio::placeholders::error));
boost::asio::async_read(
*_socket,
boost::asio::buffer(_data, 128),
boost::asio::transfer_exactly(bytes_to_transfer),
boost::bind(
&communications_client::handle_read,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
while (true)
{
_io_service->poll_one();
if (_message_received)
{
timer.cancel();
break;
}
else if (_timeout_triggered)
{
_socket->cancel();
break;
}
}
return _bytes_transferred;
}
The main question I have is: why this works with a loop on _io_service->poll_one()
and no without a loop and calling _io_service->run_one()
? Also, I would like to know if it looks correct to anyone who is more used to work with Boost and Asio. Thank you!
According to the comments done by Jonathan Wakely the loop could be replaced using _io_service->run_one()
with a call to _io_service->reset()
after the operations have finished. It should look like:
_io_service->run_one();
if (_message_received)
{
timer.cancel();
}
else if (_timeout_triggered)
{
_socket->cancel();
}
_io_service->reset();
After some testing, I have checked that this kind of solution alone is not working. The handle_timeout
method is being called continuously with the error code operation_aborted
. How can these calls be stopped?
The answer by twsansbury is accurate and based onto solid documentation basis. That implementation leads to the following code within the async_read_helper
:
while (_io_service->run_one())
{
if (_message_received)
{
timer.cancel();
}
else if (_timeout_triggered)
{
_socket->cancel();
}
}
_io_service->reset();
and the following change to the handle_read
method:
void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
if (error != boost::asio::error::operation_aborted)
{
...
}
}
This solution has proved solid and correct during testing.
The main difference between io_service::run_one()
and io_service::poll_one()
is that run_one()
will block until a handler is ready to run, whereas poll_one()
will not wait for any outstanding handlers to become ready.
Assuming the only outstanding handlers on _io_service
are handle_timeout()
and handle_read()
, then run_one()
does not require a loop because it will only return once either handle_timeout()
or handle_read()
have ran. On the other hand, poll_one()
requires a loop because poll_one()
will return immediately, as neither handle_timeout()
nor handle_read()
are ready to run, causing the function to eventually return.
The main issue with the original code, as well as the fix proposal #1, is that there are still outstanding handlers in the io_service when async_read_helper()
returns. Upon the next call to async_read_helper()
, the next handler to be invoked will be a handler from the previous call. The io_service::reset()
method only allows the io_service to resume running from a stopped state, it does not remove any handlers already queued into the io_service. To account for this behavior, try using a loop to consume all of the handlers from the io_service. Once all handlers have been consumed, exit the loop and reset the io_service:
// Consume all handlers.
while (_io_service->run_one())
{
if (_message_received)
{
// Message received, so cancel the timer. This will force the completion of
// handle_timer, with boost::asio::error::operation_aborted as the error.
timer.cancel();
}
else if (_timeout_triggered)
{
// Timeout occured, so cancel the socket. This will force the completion of
// handle_read, with boost::asio::error::operation_aborted as the error.
_socket->cancel();
}
}
// Reset service, guaranteeing it is in a good state for subsequent runs.
_io_service->reset();
From the caller's perspective, this form of timeout is synchronous as run_one()
blocks. However, work is still being made within the I/O service. An alternative is to use Boost.Asio's support for C++ futures to wait on a future and perform a timeout. This code can be easier to read, but it requires at least one other thread to be processing the I/O service, as the thread waiting on the timeout is no longer processing the I/O service:
// Use an asynchronous operation so that it can be cancelled on timeout.
std::future<std::size_t> read_result = boost::asio::async_read(
socket, buffer, boost::asio::use_future);
// If timeout occurs, then cancel the operation.
if (read_result.wait_for(std::chrono::seconds(1)) ==
std::future_status::timeout)
{
socket.cancel();
}
// Otherwise, the operation completed (with success or error).
else
{
// If the operation failed, then on_read.get() will throw a
// boost::system::system_error.
auto bytes_transferred = read_result.get();
// process buffer
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With