Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I read from a socket synchronously using Boost.Asio with a timeout on a multithreaded I/O service?

I have an application that uses Boost.Asio for TCP and UDP socket communications. I understand that the 'A' in "Asio" stands for Asynchronous, so the library is bent toward encouraging you to use asynchronous I/O when possible. I have a few cases where synchronous socket reads are preferable. At the same time, however, I would like to set a timeout on said receive calls, so there's no possibility of the read blocking indefinitely.

This appears to be a pretty common problem among Boost.Asio users, with the following past Stack Overflow questions on the topic:

  • C++ Boost ASIO: how to read/write with a timeout?
  • asio::read with timeout
  • boost asio timeout
  • How to set a timeout on blocking sockets in boost asio?

There may even be more. There are even examples in the documentation for how to implement synchronous operations with timeouts. They boil down to converting the synchronous operation to an asynchronous one, then starting it in parallel with a asio::deadline_timer . The timer's expiration handler can then cancel the asynchronous read in the event that the timeout expires. This looks something like this (snippet taken from the above linked example):

std::size_t receive(const boost::asio::mutable_buffer& buffer,
      boost::posix_time::time_duration timeout, boost::system::error_code& ec)
  {
    // Set a deadline for the asynchronous operation.
    deadline_.expires_from_now(timeout);

    // Set up the variables that receive the result of the asynchronous
    // operation. The error code is set to would_block to signal that the
    // operation is incomplete. Asio guarantees that its asynchronous
    // operations will never fail with would_block, so any other value in
    // ec indicates completion.
    ec = boost::asio::error::would_block;
    std::size_t length = 0;

    // Start the asynchronous operation itself. The handle_receive function
    // used as a callback will update the ec and length variables.
    socket_.async_receive(boost::asio::buffer(buffer),
        boost::bind(&client::handle_receive, _1, _2, &ec, &length));

    // Block until the asynchronous operation has completed.
    do io_service_.run_one(); while (ec == boost::asio::error::would_block);

    return length;
  }

This actually is a relatively clean solution: start the asynchronous operations, then manually poll the asio::io_service to execute asynchronous handlers one at a time until either the async_receive() completes or the timer expires.

However, what about the case where the socket's underlying I/O service is already being run in one or more background threads? In that case, there's no guarantee that the handlers for the asynchronous operations would be run by the foreground thread in the above snippet, so run_one() wouldn't return until some later, possibly unrelated, handler executes. This would make the socket reads rather unresponsive.

asio::io_service has a poll_one() function that will check the service's queue without blocking, but I don't see a good way to block the foreground thread (emulating the synchronous call behavior) until the handler executes, except for the case where there are no background threads that are executing asio::io_service::run() already.

I see two potential solutions, neither of which I like:

  1. Use a condition variable or similar construct to make the foreground thread block after starting the asynchronous operations. In the handler for the async_receive() call, signal the condition variable to unblock the thread. This induces some locking for each read, which I would like to avoid, as I'd like to achieve the maximum possible throughput on the UDP socket reads. Otherwise, it is viable, and is probably what I would do unless a superior method presents itself.

  2. Ensure that the socket has its own asio::io_service that is not being run by any background threads. This makes it harder to use asynchronous I/O with the socket in the cases where that is desired.

Any ideas for other ways to accomplish this in a safe way?


Aside: There are some answers to previous SO questions that advocate using the SO_RCVTIMEO socket option to implement the socket read timeout. This sounds great in theory, but it doesn't seem to work on my platform at least (Ubuntu 12.04, Boost v1.55). I can set the socket timeout, but it won't give the desired effect with Asio. The relevant code is in /boost/asio/detail/impl/socket_ops.ipp:

size_t sync_recvfrom(socket_type s, state_type state, buf* bufs,
    size_t count, int flags, socket_addr_type* addr,
    std::size_t* addrlen, boost::system::error_code& ec)
{
  if (s == invalid_socket)
  {
    ec = boost::asio::error::bad_descriptor;
    return 0;
  }

  // Read some data.
  for (;;)
  {
    // Try to complete the operation without blocking.
    signed_size_type bytes = socket_ops::recvfrom(
        s, bufs, count, flags, addr, addrlen, ec);

    // Check if operation succeeded.
    if (bytes >= 0)
      return bytes;

    // Operation failed.
    if ((state & user_set_non_blocking)
        || (ec != boost::asio::error::would_block
          && ec != boost::asio::error::try_again))
      return 0;

    // Wait for socket to become ready.
    if (socket_ops::poll_read(s, 0, ec) < 0)
      return 0;
  }
}

If a socket read times out, the call to recvfrom() above would return EAGAIN or EWOULDBLOCK, which get translated to boost::asio::error::try_again or boost::asio::error::would_block. In this case, the above code will call the poll_read() function, which for my platform looks like:

int poll_read(socket_type s, state_type state, boost::system::error_code& ec)
{
  if (s == invalid_socket)
  {
    ec = boost::asio::error::bad_descriptor;
    return socket_error_retval;
  }

  pollfd fds;
  fds.fd = s;
  fds.events = POLLIN;
  fds.revents = 0;
  int timeout = (state & user_set_non_blocking) ? 0 : -1;
  clear_last_error();
  int result = error_wrapper(::poll(&fds, 1, timeout), ec);
  if (result == 0)
    ec = (state & user_set_non_blocking)
      ? boost::asio::error::would_block : boost::system::error_code();
  else if (result > 0)
    ec = boost::system::error_code();
  return result;
}

I snipped out the code conditionally compiled for other platforms. As you can see, if the socket is not a non-blocking socket, it ends up calling poll() with an infinite timeout, therefore blocking until the socket has data to be read (and foiling the attempt at a timeout). Thus, the SO_RCVTIMEO option is not effective.

like image 217
Jason R Avatar asked Apr 09 '14 14:04

Jason R


1 Answers

Boost.Asio's support for futures may provide an elegant solution. When an asynchronous operation is provided the boost::asio::use_future value as its completion handler, the initiating function will return a std::future object that will receive the result of the operation. Additionally, if the operation completes with failure, the error_code is converted into a system_error and passed to the caller via the future.

In the Boost.Asio C++11 Futures datytime client example, a dedicated thread runs the io_service, and the main thread initiates asynchronous operations then synchronously waits on the completion of the operations, such as below:

std::array<char, 128> recv_buf;
udp::endpoint sender_endpoint;
std::future<std::size_t> recv_length =
  socket.async_receive_from(
      boost::asio::buffer(recv_buf),
      sender_endpoint,
      boost::asio::use_future);

// Do other things here while the receive completes.

std::cout.write(
    recv_buf.data(),
    recv_length.get()); // Blocks until receive is complete.

When using futures, the overall approach to implementing a synchronous read with a timeout is the same as before. Instead of using a synchronous read, one would use an asynchronous read and asynchronously wait on a timer. The only minor change is that instead of instead of blocking on the io_service or periodically checking for the predicate, one would invoke future::get() to block until the operation has completed with success or failure (such as a timeout).

If C++11 is not available, then the return type for asynchronous operations can be customized for Boost.Thread's future, as demonstrated in this answer.

like image 119
Tanner Sansbury Avatar answered Sep 19 '22 13:09

Tanner Sansbury