Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C++ boost asio Windows file handle async_read_until infinite loop - no eof

I'm using boost 1.50 with VS2010, reading using a Windows file HANDLE (which seems to be relatively uncommon compared to asio use with sockets).

Problem

The handle_read callback gets to line 8 and returns the first bit with all of line 1 appended; further callbacks cycle through from line 2 again, ad nauseum:

  • open a short text file (below)
  • get expected handle_read callbacks with correct content for lines 1 through 7
  • the next callback has a longer-than-expected bytes-read length parameter
  • though not using length, getline extracts a correspondingly longer line from the asio stream buffer
  • extracted content switches mid-line to repeat the first line from the input file
  • further handle_read callbacks recycle lines 2 through 7, then the "long hybrid" line problem happens
  • ad nauseum

Input

LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
...3--E similarly...
LINE F abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789

Output

Here's the first 15 lines of output (it continues forever):

line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...line #3 through #6 are fine too...
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 92, getline() [91] 'LINE 8 abcdefghijklmnoLINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...line #10 through #13 are fine...
line #14, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 92, getline() [91] 'LINE 8 abcdefghijklmnoLINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...

Please note how output lines #8 and #15 are a mix of input LINE 8 and LINE 1.

The code

#include "stdafx.h"

#include <cassert>
#include <iostream>
#include <string>

#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include <Windows.h>
#include <WinBase.h>

class AsyncReader
{
  public:
    AsyncReader(boost::asio::io_service& io_service, HANDLE handle)
      : io_service_(io_service),
        input_buffer(/*size*/ 8192),
        input_handle(io_service, handle)
    {
        start_read();
    }

    void start_read()
    {
        boost::asio::async_read_until(input_handle, input_buffer, '\n',
            boost::bind(&AsyncReader::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }

    void handle_read(const boost::system::error_code& error, std::size_t length);
    // void handle_write(const boost::system::error_code& error);

  private:
    boost::asio::io_service& io_service_;
    boost::asio::streambuf input_buffer;
    boost::asio::windows::stream_handle input_handle;
};

void AsyncReader::handle_read(const boost::system::error_code& error, std::size_t length)
{
    if (!error)
    {
        static int count = 0;
        ++count;

        // method 1: (same problem)
        // const char* pStart = boost::asio::buffer_cast<const char*>(input_buffer.data());
        // std::string s(pStart, length);
        // input_buffer.consume(length);

        // method 2:
        std::istream is(&input_buffer);
        std::string s;
        assert(std::getline(is, s));

        std::cout << "line #" << count << ", length " << length << ", getline() [" << s.size() << "] '" << s << "'\n";

        start_read();
    }
    else if (error == boost::asio::error::not_found)
        std::cerr << "Did not receive ending character!\n";
    else
        std::cerr << "Misc error during read!\n";
}
int _tmain(int argc, _TCHAR* argv[])
{
    boost::asio::io_service io_service;

    HANDLE handle = ::CreateFile(TEXT("c:/temp/input.txt"),
                                 GENERIC_READ,
                                 0, // share mode
                                 NULL, // security attribute: NULL = default
                                 OPEN_EXISTING, // creation disposition
                                 FILE_FLAG_OVERLAPPED,
                                 NULL // template file
                                );

    AsyncReader obj(io_service, handle);

    io_service.run();

    std::cout << "Normal termination\n";
    getchar();
    return 0;
}

My thoughts

  • It might be something in the CreateFile options - it didn't work at all until I switched to FILE_FLAG_OVERLAPPED - not sure if there are other requirements that don't even manifest as errors...?
  • I've tried input_buffer.commit and even .consume - not sure if there's something like that I'm supposed to do, even though all the example code I could find (for sockets) suggests getline takes care of that...
  • Exasperation / I miss Linux....
like image 971
Tony Delroy Avatar asked Jul 18 '13 14:07

Tony Delroy


1 Answers

This mailing list post describes the same problem. While CreateFile with FILE_FLAG_OVERLAPPED allows for asynchronous I/O, it does not establish it as a stream in the context of Boost.Asio. For streams, Boost.Asio implements read_some as read_some_at with the offset always being 0. This is the source of the problem, as the ReadFile() documentation states:

For files that support byte offsets, you must specify a byte offset at which to start reading from the file.


Adapting to Type Requirements

Boost.Asio is written very generically, often requiring arguments to meet a certain type requirement rather than be a specific type. Therefore, it is often possible to adapt either the I/O object or its service to obtain the desired behavior. First, one must identify what the adapted interface needs to support. In this case, async_read_until accepts any type fulfilling the type requirements of AsyncReadStream. AsyncReadStream's requirements are fairly basic, requiring a void async_read_some(MutableBufferSequence, ReadHandler) member function.

As the offset value will need to be tracked throughout the composed async_read_until operation, a simple type meeting the requirements of ReadHandler can be introduced that will wrap an application's ReadHandler, and update the offset accordingly.

namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
  read_some_offset_handler(Handler handler, boost::uint64_t& offset)
    : handler_(handler),
      offset_(offset)
  {}

  void operator()(
    const boost::system::error_code& error,
    std::size_t bytes_transferred)
  {
    offset_ += bytes_transferred;

    // If bytes were transferred, then set the error code as success.
    // EOF will be detected on next read.  This is to account for
    // the read_until algorithm behavior.
    const boost::system::error_code result_ec =
      (error && bytes_transferred)
      ? make_error_code(boost::system::errc::success) : error;

    handler_(result_ec, bytes_transferred);
  }

//private:
  Handler handler_;
  boost::uint64_t& offset_;
};

/// @brief Hook that allows the wrapped handler to be invoked
///        within specific context.  This is critical to support
///        composed operations being invoked within a strand.
template <typename Function,
          typename Handler>
void asio_handler_invoke(
  Function function,
  detail::read_some_offset_handler<Handler>* handler)
{
  boost_asio_handler_invoke_helpers::invoke(
    function, handler->handler_);
}

} // namespace detail

The asio_handler_invoke hook will be found through ADL to support invoking user handlers in the proper context. This is critical for tread safety when a composed operation is being invoked within a strand. For more details on composed operations and strands, see this answer.

The following class will adapt boost::asio::windows::random_access_handle to meet the type requirements of AsyncReadStream.

/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
  : public AsyncRandomAccessReadDevice
{
public:
  basic_adapted_stream(
    boost::asio::io_service& io_service,
    HANDLE handle
  )
    : AsyncRandomAccessReadDevice(io_service, handle),
      offset_(0)
  {}

  template<typename MutableBufferSequence,
           typename ReadHandler>
  void async_read_some(
    const MutableBufferSequence& buffers,
    ReadHandler handler)
  {
    async_read_at(*this, offset_, buffers, 
      detail::read_some_offset_handler<ReadHandler>(handler, offset_));
  }

private:
  boost::uint64_t offset_;
};

Alternatively, boost::asio::windows::basic_stream_handle can be provided a custom type meeting the requirements of StreamHandleService types, and implement async_read_some in terms of async_read_some_at.

/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
  : public boost::asio::windows::stream_handle_service
{
private:
  // The type of the platform-specific implementation.
  typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:

  /// The unique service identifier.
  static boost::asio::io_service::id id;

  /// Construct a new stream handle service for the specified io_service.
  explicit offset_stream_handle_service(boost::asio::io_service& io_service)
    : boost::asio::windows::stream_handle_service(io_service),
      service_impl_(io_service),
      offset_(0)
  {}

  /// Start an asynchronous read.
  template <typename MutableBufferSequence,
            typename ReadHandler>
  void
  async_read_some(
    implementation_type& impl,
    const MutableBufferSequence& buffers,
    ReadHandler handler)
  {
    // Implement async_read_some in terms of async_read_some_at.  The provided
    // ReadHandler will be hoisted in an internal handler so that offset_ can
    // be properly updated.
    service_impl_.async_read_some_at(impl, offset_, buffers, 
      detail::read_some_offset_handler<ReadHandler>(handler, offset_));
  }
private:
  // The platform-specific implementation.
  service_impl_type service_impl_;
  boost::uint64_t offset_;
};

boost::asio::io_service::id offset_stream_handle_service::id;

I have opted for simplicity in the example code, but the same service will be used by multiple I/O objects. Thus, the offset_stream_handle_service would need to manage an offset per handler to function properly when multiple I/O objects use the service.

To use the adapted types, modify the AsyncReader::input_handle member variable to be either a basic_adapted_stream<boost::asio::windows::random_access_handle> (adapted I/O object) or boost::asio::windows::basic_stream_handle<offset_stream_handle_service> (adapted service).


Example

Here is the complete example based on the original code, only modifying the AsyncReader::input_handler's type:

#include "stdafx.h"

#include <cassert>
#include <iostream>
#include <string>

#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include <Windows.h>
#include <WinBase.h>


namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
  read_some_offset_handler(Handler handler, boost::uint64_t& offset)
    : handler_(handler),
      offset_(offset)
  {}

  void operator()(
    const boost::system::error_code& error,
    std::size_t bytes_transferred)
  {
    offset_ += bytes_transferred;

    // If bytes were transferred, then set the error code as success.
    // EOF will be detected on next read.  This is to account for
    // the read_until algorithm behavior.
    const boost::system::error_code result_ec =
      (error && bytes_transferred)
      ? make_error_code(boost::system::errc::success) : error;

    handler_(result_ec, bytes_transferred);
  }

//private:
  Handler handler_;
  boost::uint64_t& offset_;
};

/// @brief Hook that allows the wrapped handler to be invoked
///        within specific context.  This is critical to support
///        composed operations being invoked within a strand.
template <typename Function,
          typename Handler>
void asio_handler_invoke(
  Function function,
  detail::read_some_offset_handler<Handler>* handler)
{
  boost_asio_handler_invoke_helpers::invoke(
    function, handler->handler_);
}

} // namespace detail

/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
  : public AsyncRandomAccessReadDevice
{
public:
  basic_adapted_stream(
    boost::asio::io_service& io_service,
    HANDLE handle
  )
    : AsyncRandomAccessReadDevice(io_service, handle),
      offset_(0)
  {}

  template<typename MutableBufferSequence,
           typename ReadHandler>
  void async_read_some(
    const MutableBufferSequence& buffers,
    ReadHandler handler)
  {
    async_read_at(*this, offset_, buffers, 
      detail::read_some_offset_handler<ReadHandler>(handler, offset_));
  }

private:
  boost::uint64_t offset_;
};

/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
  : public boost::asio::windows::stream_handle_service
{
private:
  // The type of the platform-specific implementation.
  typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:

  /// The unique service identifier.
  static boost::asio::io_service::id id;

  /// Construct a new stream handle service for the specified io_service.
  explicit offset_stream_handle_service(boost::asio::io_service& io_service)
    : boost::asio::windows::stream_handle_service(io_service),
      service_impl_(io_service),
      offset_(0)
  {}

  /// Start an asynchronous read.
  template <typename MutableBufferSequence,
            typename ReadHandler>
  void
  async_read_some(
    implementation_type& impl,
    const MutableBufferSequence& buffers,
    ReadHandler handler)
  {
    // Implement async_read_some in terms of async_read_some_at.  The provided
    // ReadHandler will be hoisted in an internal handler so that offset_ can
    // be properly updated.
    service_impl_.async_read_some_at(impl, offset_, buffers, 
      detail::read_some_offset_handler<ReadHandler>(handler, offset_));
  }
private:
  // The platform-specific implementation.
  service_impl_type service_impl_;
  boost::uint64_t offset_;
};

boost::asio::io_service::id offset_stream_handle_service::id;

#ifndef ADAPT_IO_SERVICE
typedef basic_adapted_stream<
    boost::asio::windows::random_access_handle> adapted_stream;
#else
typedef boost::asio::windows::basic_stream_handle<
    offset_stream_handle_service> adapted_stream;
#endif

class AsyncReader
{
  public:
    AsyncReader(boost::asio::io_service& io_service, HANDLE handle)
      : io_service_(io_service),
        input_buffer(/*size*/ 8192),
        input_handle(io_service, handle)
    {
        start_read();
    }

    void start_read()
    {
        boost::asio::async_read_until(input_handle, input_buffer, '\n',
            boost::bind(&AsyncReader::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }

    void handle_read(const boost::system::error_code& error, std::size_t length);
    // void handle_write(const boost::system::error_code& error);

  private:
    boost::asio::io_service& io_service_;
    boost::asio::streambuf input_buffer;
    adapted_stream input_handle;
};

void AsyncReader::handle_read(const boost::system::error_code& error, std::size_t length)
{
    if (!error)
    {
        static int count = 0;
        ++count;

        // method 1: (same problem)
        // const char* pStart = boost::asio::buffer_cast<const char*>(input_buffer.data());
        // std::string s(pStart, length);
        // input_buffer.consume(length);

        // method 2:
        std::istream is(&input_buffer);
        std::string s;
        assert(std::getline(is, s));

        std::cout << "line #" << count << ", length " << length << ", getline() [" << s.size() << "] '" << s << "'\n";

        start_read();
    }
    else if (error == boost::asio::error::not_found)
        std::cerr << "Did not receive ending character!\n";
    else
        std::cerr << "Misc error during read!\n";
}
int _tmain(int argc, _TCHAR* argv[])
{
    boost::asio::io_service io_service;

    HANDLE handle = ::CreateFile(TEXT("c:/temp/input.txt"),
                                 GENERIC_READ,
                                 0, // share mode
                                 NULL, // security attribute: NULL = default
                                 OPEN_EXISTING, // creation disposition
                                 FILE_FLAG_OVERLAPPED,
                                 NULL // template file
                                );

    AsyncReader obj(io_service, handle);

    io_service.run();

    std::cout << "Normal termination\n";
    getchar();
    return 0;
}

Which produces the following output when using the input from the original question:

line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Misc error during read!
Normal termination

My input file did not have a \n character at the end of LINE F. Thus, AsyncReader::handle_read() gets invoked with an error of boost::asio::error::eof and input_buffer's contents contain LINE F. After modifying the final else case to print more information:

...
else
{
    std::cerr << "Error: " << error.message() << "\n";

    if (std::size_t buffer_size = input_buffer.size())
    {
        boost::asio::streambuf::const_buffers_type bufs = input_buffer.data();
        std::string contents(boost::asio::buffers_begin(bufs),
                             boost::asio::buffers_begin(bufs) + buffer_size);
        std::cerr << "stream contents: '" << contents << "'\n";
    }
}

I get the following output:

line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Error: End of file
stream contents: 'LINE F abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Normal termination
like image 52
Tanner Sansbury Avatar answered Sep 19 '22 06:09

Tanner Sansbury