I am in the processes of writing some article about using Blocking Vs Non-Blocking sockets. I am currently doing some experiments using threads and blocking sockets and have turned up some interesting results I am not sure how to explain.
Note: I know that modern servers use an event driven model with non-blocking sockets to achieve much better performance and I am working in the direction but I want to get the base line data numbers first.
The question I think I should ask is below. But any input on what is happening or what I should be actually asking or need to time/measure/examin would be gratefully accepted.
Experiments are running on Amazon:
Instance T vCPUs Memory (GiB) Storage (GB) Network
c3.2xlarge 8 15 2 x 80 SSD High
I am using siege to load test the server:
> wc data.txt
0 1 32 data.txt
> siege --delay=0.001 --time=1m --concurrent=<concurrency> -H 'Content-Length: 32' -q '<host>/message POST < data.txt'
I have four versions of the code. Which is the most basic basic type of http server. No matter what you request you get the same response (this is basically to test throughput).
std::thread
which is detached.std::thread
. Each accepted request creates a job that is added to job queue for processing by the thread pool.std::async()
Actual concurrent sizes tried.
1, 2, 4, 8, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 255
I was surprised at the performance of the "Multi" Threaded version. So I doubled the size of the thread pool version to see what happened.
ThreadQueue jobs(std::thread::hardware_concurrency());
// Changed this line to:
ThreadQueue jobs(std::thread::hardware_concurrency() * 2);
That's why you see two lines for thread pool in the graphs.
It's not unexpected that the standard library std::async()
is the best version. But I am totally dumbfounded by the Multi threaded version having basically the same performance.
This version (Multi Threaded) creates a new thread for every accepted incoming connection and then simply detaches the thread (allowing it to run to completion). As the concurrency reaches 255 we will have 255 background threads running in the processes.
Given the short runtime of Socket::worker()
I can not believe the cost of creating a thread is negligible in comparison to this work. Also because it maintains a similar performance to std::async()
it seems to suggest that there is some re-use going on behind the scenes.
Does anybody have any knowledge about the standards requirements for thread re-use and what I should expect the re-use behavior to be?
At what point will the blocking model break down? At 255 concurrent requests I was not expecting the threading model to keep up. I obviously need to reset my expectations here.
The socket wrapper code is a very thin layer of standard sockets (just throwing exceptions when things go wrong). The current code is here if needed but I don't think it matters.
The full source of this code is available here.
This is the shared bit of code that is common to all the servers. Basically it receives an accepted socket object (via move) and basically writes the data
object to that socket.
void worker(DataSocket&& accepted, ServerSocket& server, std::string const& data, int& finished)
{
DataSocket accept(std::move(accepted));
HTTPServer acceptHTTPServer(accept);
try
{
std::string message;
acceptHTTPServer.recvMessage(message);
// std::cout << message << "\n";
if (!finished && message == "Done")
{
finished = 1;
server.stop();
acceptHTTPServer.sendMessage("", "Stoped");
}
else
{
acceptHTTPServer.sendMessage("", data);
}
}
catch(DropDisconnectedPipe const& e)
{
std::cerr << "Pipe Disconnected: " << e.what() << "\n";
}
}
int main(int argc, char* argv[])
{
// Builds a string that is sent back with each response.
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
while(!finished)
{
Sock::DataSocket accept = server.accept();
// Simply sends "data" back over http.
Sock::worker(std::move(accept), server, data, finished);
}
}
int main(int argc, char* argv[])
{
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
while(!finished)
{
Sock::DataSocket accept = server.accept();
std::thread work(Sock::worker, std::move(accept), std::ref(server), std::ref(data), std::ref(finished));
work.detach();
}
}
int main(int argc, char* argv[])
{
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
std::cerr << "Concurrency: " << std::thread::hardware_concurrency() << "\n";
ThreadQueue jobs(std::thread::hardware_concurrency());
while(!finished)
{
Sock::DataSocket accept = server.accept();
// Had some issues with storing a lambda that captured
// a move only object so I created WorkJob as a simple
// functor instead of the lambda.
jobs.startJob(WorkJob(std::move(accept), server, data, finished));
}
}
Then Auxiliary code to control the pool
class WorkJob
{
Sock::DataSocket accept;
Sock::ServerSocket& server;
std::string const& data;
int& finished;
public:
WorkJob(Sock::DataSocket&& accept, Sock::ServerSocket& server, std::string const& data, int& finished)
: accept(std::move(accept))
, server(server)
, data(data)
, finished(finished)
{}
WorkJob(WorkJob&& rhs)
: accept(std::move(rhs.accept))
, server(rhs.server)
, data(rhs.data)
, finished(rhs.finished)
{}
void operator()()
{
Sock::worker(std::move(accept), server, data, finished);
}
};
class ThreadQueue
{
using WorkList = std::deque<WorkJob>;
std::vector<std::thread> threads;
std::mutex safe;
std::condition_variable cond;
WorkList work;
int finished;
WorkJob getWorkJob()
{
std::unique_lock<std::mutex> lock(safe);
cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});
auto result = std::move(work.front());
work.pop_front();
return result;
}
void doWork()
{
while(!finished)
{
WorkJob job = getWorkJob();
if (!finished)
{
job();
}
}
}
public:
void startJob(WorkJob&& item)
{
std::unique_lock<std::mutex> lock(safe);
work.push_back(std::move(item));
cond.notify_one();
}
ThreadQueue(int count)
: threads(count)
, finished(false)
{
for(int loop = 0;loop < count; ++loop)
{
threads[loop] = std::thread(&ThreadQueue::doWork, this);
}
}
~ThreadQueue()
{
{
std::unique_lock<std::mutex> lock(safe);
finished = true;
}
cond.notify_all();
}
};
int main(int argc, char* argv[])
{
std::string data = Sock::commonSetUp(argc, argv);
int finished = 0;
Sock::ServerSocket server(8080);
FutureQueue future(finished);
while(!finished)
{
Sock::DataSocket accept = server.accept();
future.addFuture([accept = std::move(accept), &server, &data, &finished]() mutable {Sock::worker(std::move(accept), server, data, finished);});
}
}
Auxiliary class to tidy up the future.
class FutureQueue
{
using MyFuture = std::future<void>;
using FutureList = std::list<MyFuture>;
int& finished;
FutureList futures;
std::mutex mutex;
std::condition_variable cond;
std::thread cleaner;
void waiter()
{
while(finished)
{
std::future<void> next;
{
std::unique_lock<std::mutex> lock(mutex);
cond.wait(lock, [this](){return !(this->futures.empty() && !this->finished);});
if (futures.empty() && !finished)
{
next = std::move(futures.front());
futures.pop_front();
}
}
if (!next.valid())
{
next.wait();
}
}
}
public:
FutureQueue(int& finished)
: finished(finished)
, cleaner(&FutureQueue::waiter, this)
{}
~FutureQueue()
{
cleaner.join();
}
template<typename T>
void addFuture(T&& lambda)
{
std::unique_lock<std::mutex> lock(mutex);
futures.push_back(std::async(std::launch::async, std::move(lambda)));
cond.notify_one();
}
};
std::thread Threads allow multiple functions to execute concurrently. std::thread objects may also be in the state that does not represent any thread (after default construction, move from, detach, or join), and a thread of execution may not be associated with any thread objects (after detach).
Thread-based multitasking deals with the concurrent execution of pieces of the same program. A multithreaded program contains two or more parts that can run concurrently. Each part of such a program is called a thread, and each thread defines a separate path of execution.
Obviously, no STL data structure is thread-safe. But at least, with std::vector for example, you can simply use mutexes to protect access to the vector.
std::thread::id The class thread::id is a lightweight, trivially copyable class that serves as a unique identifier of std::thread and std::jthread (since C++20) objects. Instances of this class may also hold the special distinct value that does not represent any thread.
This application is certainly going to be I/O-bound and not CPU-bound, meaning that the vast majority of the time spent processing any single request is spent waiting in blocking I/O operations, not actually doing computation.
So having more threads (up to a point, but likely past 256 of them) is going to be faster, because it allows more concurrent I/O on different sockets as they all swap off the CPUs.
In other words, it's not the 8 cores that's the bottleneck, it's the socket communication. So you want to parallelize that as much as possible (or use non-blocking I/O).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With