The boost::asio
documentation for async_receive()
states that it supports "receiving into multiple buffers in one go", and while I can code this I can't actually see how (or if) it works.
We have a situation where one of our vendors is sending us many thousands of UDP packets per second, enough that we're seeing the "Packets Received Discarded" spiking in certain situations.
It would be ideal if we really could fill multiple buffers in one async_receive()
call, but during testing it seems that even if multiple buffers are specified, the handler is only called for one datagram.
I've included my test code, sorry it's so verbose, but I needed it to be flexible to listen on multiple interfaces/multicasts.
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include <memory>
#include <algorithm>
#include <vector>
#include <string>
#include <cstdint>
// configuration options...
std::string nic;
std::string mc;
uint16_t port = 0;
uint16_t buffer_size = 0;
uint32_t socket_buffer_size = 0;
uint32_t scat_cnt = 1;
// The raw data buffer
std::vector<uint8_t> buffer;
// The scatter/gather buffer
std::vector<boost::asio::mutable_buffer> gather_buffer;
boost::asio::io_service svc;
std::unique_ptr<boost::asio::ip::udp::socket> socket_;
size_t messages_received = 0;
size_t bytes_received = 0;
bool parse_command_line(std::vector<std::string> command_line);
void on_receive(const boost::system::error_code& ec, size_t bytes)
{
if(!ec)
{
socket_->async_receive(
gather_buffer,
[] (const boost::system::error_code& ec, size_t bytes)
{
on_receive(ec, bytes);
});
++messages_received;
bytes_received += bytes;
if(0 == messages_received % 1000)
{
std::cout << "Received: " << messages_received << " messages, " << bytes_received << " bytes.\n";
}
}
else
{
std::cout << "Error: " << ec.message() << '\n';
}
}
int main(int argc, char** argv)
{
if(parse_command_line(std::vector<std::string>(argv, argv+argc)))
{
try
{
std::cout << "Resizing segment buffer to: " << buffer_size << std::endl;
buffer.resize(buffer_size * scat_cnt);
for(uint32_t x = 0; x < scat_cnt; ++x)
{
gather_buffer.push_back(
boost::asio::buffer(buffer.data() + (buffer_size * x), buffer_size));
}
std::cout << "Setting up receiving socket." << std::endl;
socket_.reset(new boost::asio::ip::udp::socket(svc));
socket_->open(boost::asio::ip::udp::v4());
socket_->set_option(boost::asio::socket_base::reuse_address(true));
std::cout << "Binding to local NIC: " << nic << std::endl;
socket_->bind(boost::asio::ip::udp::endpoint(boost::asio::ip::address::from_string(nic), port));
boost::asio::socket_base::non_blocking_io no_block(true);
socket_->io_control(no_block);
std::cout << "Setting socket buffer size to " << socket_buffer_size << std::endl;
boost::asio::socket_base::receive_buffer_size sock_bf_sz(socket_buffer_size);
socket_->set_option(sock_bf_sz);
std::cout << "Joining multicast " << mc << " on " << nic << std::endl;
boost::asio::ip::multicast::join_group jg(boost::asio::ip::address_v4::from_string(mc), boost::asio::ip::address_v4::from_string(nic));
socket_->set_option(jg);
std::cout << "Listening..." << std::endl;
socket_->async_receive(
gather_buffer,
[] (const boost::system::error_code& ec, size_t bytes)
{
on_receive(ec, bytes);
});
std::unique_ptr<boost::asio::io_service::work> w(new boost::asio::io_service::work(svc));
std::cout << "Starting boost proactor..." << std::endl;
boost::thread thread([&] () { svc.run(); });
boost::this_thread::sleep_for(boost::chrono::seconds(60));
w.reset();
thread.join();
}
catch(boost::system::error_code& ec)
{
std::cout << "Boost error: " << ec.message() << '\n';
}
catch(...)
{
std::cout << "Unknown Error!\n";
}
}
return 0;
}
bool parse_command_line(std::vector<std::string> command_line)
{
for(size_t idx = 0, max_switches = command_line.size();
idx < max_switches; ++idx)
{
auto& curr = command_line[idx];
std::transform(curr.begin(), curr.end(), curr.begin(), ::tolower);
if(curr == "-nic" && ++idx < max_switches)
{
nic = command_line[idx];
}
else if(curr == "-multicast" && ++idx < max_switches)
{
mc = command_line[idx];
}
else if(curr == "-port" && ++idx < max_switches)
{
port = boost::lexical_cast<uint16_t>(command_line[idx]);
}
else if(curr == "-bfsz" && ++idx < max_switches)
{
buffer_size = boost::lexical_cast<uint16_t>(command_line[idx]);
}
else if(curr == "-sockbfsz" && ++idx < max_switches)
{
socket_buffer_size = boost::lexical_cast<uint32_t>(command_line[idx]);
}
else if(curr == "-scattercnt" && ++idx < max_switches)
{
scat_cnt = boost::lexical_cast<uint32_t>(command_line[idx]);
}
}
std::cout
<< "NIC: " << nic << '\n'
<< "MC: " << mc << '\n'
<< "Port: " << port << '\n'
<< "Segment Size: " << buffer_size << '\n'
<< "Socket Buffer Size: " << socket_buffer_size << '\n'
<< "Scatter/Gather: " << scat_cnt << std::endl;
return
!nic.empty() &&
!mc.empty() &&
port != 0 &&
buffer_size != 0 &&
socket_buffer_size != 0
;
}
It'll receive into multiple buffers, but you're never going to get more than one datagram at a time. That's just how recv
works and I think people would be pretty surprised if its behavior suddenly changed. The intent of gather is to split a single packet into multiple buffers, which may be useful if you're expecting to receive data that is conceptually segmented, but it is not for receiving multiple buffers. This is basically a wrapper around recvmsg
.
Maybe try not using boost::asio?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With