I want to create an autonomous thread devoted only to receive data from an UDP socket using boost libraries (asio). This thread should be an infinite loop triggered by some data received from the UDP socket. In my application I need to use an asynchronous receive operation.
If I use the synchronous function receive_from everything works as expected.
However if I use async_receive_from the handler is never called. Since I use a semaphore to detect that some data have been received, the program locks and the loop is never triggered.
I have verified (with a network analyzer) that the sender device properly sends the data on the UDP socket.
I have isolated the problem in the following code.
#include <boost\array.hpp>
#include <boost\asio.hpp>
#include <boost\thread.hpp>
#include <boost\interprocess\sync\interprocess_semaphore.hpp>
#include <iostream>
typedef boost::interprocess::interprocess_semaphore Semaphore;
using namespace boost::asio::ip;
class ReceiveUDP
{
public:
    boost::thread*  m_pThread;
    boost::asio::io_service         m_io_service;
    udp::endpoint                   m_local_endpoint;
    udp::endpoint                   m_sender_endpoint;
    udp::socket                     m_socket;
    size_t      m_read_bytes;
    Semaphore   m_receive_semaphore;
    ReceiveUDP() :
        m_socket(m_io_service),
        m_local_endpoint(boost::asio::ip::address::from_string("192.168.0.254"), 11),
        m_sender_endpoint(boost::asio::ip::address::from_string("192.168.0.11"), 5550),
        m_receive_semaphore(0)
    {
        Start();
    }
    void Start()
    {
        m_pThread = new boost::thread(&ReceiveUDP::_ThreadFunction, this);
    }
    void _HandleReceiveFrom(
        const boost::system::error_code& error,
        size_t                                  received_bytes)
    {
        m_receive_semaphore.post();
        m_read_bytes = received_bytes;
    }
    void _ThreadFunction()
    {
        try
        {
            boost::array<char, 100> recv_buf;
            m_socket.open(udp::v4());
            m_socket.bind(m_local_endpoint);
            m_io_service.run();
            while (1)
            {
#if 1 // THIS WORKS
                m_read_bytes = m_socket.receive_from(
                    boost::asio::buffer(recv_buf), m_sender_endpoint);
#else // THIS DOESN'T WORK
                m_socket.async_receive_from(
                    boost::asio::buffer(recv_buf),
                    m_sender_endpoint,
                    boost::bind(&ReceiveUDP::_HandleReceiveFrom, this,
                    boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
                /* The program locks on this wait since _HandleReceiveFrom
                is never called. */
                m_receive_semaphore.wait();
#endif
                std::cout.write(recv_buf.data(), m_read_bytes);
            }
            m_socket.close();
        }
        catch (std::exception& e)
        {
            std::cerr << e.what() << std::endl;
        }
    }
};
void main()
{
    ReceiveUDP  receive_thread;
    receive_thread.m_pThread->join();
}
A timed_wait on the semaphore is to be preferred, however for debug purposes I have used a blocking wait as in the code above.
Did I miss something? Where is my mistake?
Your call to io_service.run() is exiting because there is no work for the io_service to do. The code then enters the while loop and calls m_socket.async_receive_from. At this point the io_service is not running ergo it never reads the data and calls your handler.
you need to schedule the work to do before calling io_service run:
ie:
// Configure io service
ReceiveUDP  receiver;
m_socket.open(udp::v4());
m_socket.bind(m_local_endpoint);
m_socket.async_receive_from(
    boost::asio::buffer(recv_buf),
    m_sender_endpoint,
    boost::bind(&ReceiveUDP::_HandleReceiveFrom, receiver,
    boost::asio::placeholders::error,
    boost::asio::placeholders::bytes_transferred));
The handler function will do the following:
// start the io service
void HandleReceiveFrom(
    const boost::system::error_code& error,
    size_t received_bytes)
{
    m_receive_semaphore.post();
    // schedule the next asynchronous read
    m_socket.async_receive_from(
        boost::asio::buffer(recv_buf),
        m_sender_endpoint,
        boost::bind(&ReceiveUDP::_HandleReceiveFrom, receiver,
        boost::asio::placeholders::error,
        boost::asio::placeholders::bytes_transferred));
    m_read_bytes = received_bytes;
}
Your thread then simply waits for the semaphore:
while (1)
{
    m_receive_semaphore.wait();
    std::cout.write(recv_buf.data(), m_read_bytes);
}
Notes:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With