Question:
3 while loops below contain code that has been commented out. I search for ("TAG1", "TAG2", and "TAG3") for easy identification. I simply want the while loops to wait on the condition tested to become true before proceeding while minimizing CPU resources as much as possible. I first tried using Boost condition variables, but there's a race condition. Putting the thread to sleep for 'x' microseconds is inefficient because there is no way to precisely time the wakeup. Finally, boost::this_thread::yield() does not seem to do anything. Probably because I only have 2 active threads on a dual-core system. Specifically, how can I make the three tagged areas below run more efficiently while introducing as little unnecessary blocking as possible.
BACKGROUND
Objective:
I have an application that logs a lot of data. After profiling, I found that much time is consumed on the logging operations (logging text or binary to a file on the local hard disk). My objective is to reduce the latency on logData calls by replacing non-threaded direct write calls with calls to a threaded buffered stream logger.
Options Explored:
Design:
Results:
Compiles with current version of Boost (1.55)
Header
#ifndef BufferedLogStream_h
#define BufferedLogStream_h
#include <stdio.h>
#include <iostream>
#include <iostream>
#include <cstdlib>
#include "boost\chrono\chrono.hpp"
#include "boost\thread\thread.hpp"
#include "boost\thread\locks.hpp"
#include "boost\thread\mutex.hpp"
#include "boost\thread\condition_variable.hpp"
#include <time.h>
using namespace std;
#define BENCHMARK_STR_SIZE 128
#define NUM_BENCHMARK_WRITES 524288
#define TEST 0
#define BENCHMARK 1
#define WORKER_LOOP_WAIT_MICROSEC 20
#define MAIN_LOOP_WAIT_MICROSEC 10
#if(TEST)
#define BUFFER_SIZE 10
#else
#define BUFFER_SIZE 33554432 //4 MB
#endif
class BufferedLogStream {
public:
BufferedLogStream();
void openFile(char* filename);
void flush();
void close();
inline void writeMessage(const char* message, unsigned int length);
void writeMessage(string message);
bool operator() () { return start != end; }
private:
void threadedWriter();
inline bool hasSomethingToWrite();
inline unsigned int getFreeSpaceInBuffer();
void appendStringToBuffer(const char* message, unsigned int length);
FILE* fp;
char* start;
char* end;
char* endofringbuffer;
char ringbuffer[BUFFER_SIZE];
bool workerthreadkeepalive;
boost::mutex mtx;
boost::condition_variable waitforempty;
boost::mutex workmtx;
boost::condition_variable waitforwork;
#if(TEST)
struct testbuffer {
int length;
char message[BUFFER_SIZE * 2];
};
public:
void test();
private:
void getNextRandomTest(testbuffer &tb);
FILE* datatowrite;
#endif
#if(BENCHMARK)
public:
void runBenchmark();
private:
void initBenchmarkString();
void runDirectWriteBaseline();
void runBufferedWriteBenchmark();
char benchmarkstr[BENCHMARK_STR_SIZE];
#endif
};
#if(TEST)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->openFile("replicated.txt");
bl->test();
bl->close();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif
#if(BENCHMARK)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->runBenchmark();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif //for benchmark
#endif
Implementation
#include "BufferedLogStream.h"
BufferedLogStream::BufferedLogStream() {
fp = NULL;
start = ringbuffer;
end = ringbuffer;
endofringbuffer = ringbuffer + BUFFER_SIZE;
workerthreadkeepalive = true;
}
void BufferedLogStream::openFile(char* filename) {
if(fp) close();
workerthreadkeepalive = true;
boost::thread t2(&BufferedLogStream::threadedWriter, this);
fp = fopen(filename, "w+b");
}
void BufferedLogStream::flush() {
fflush(fp);
}
void BufferedLogStream::close() {
workerthreadkeepalive = false;
if(!fp) return;
while(hasSomethingToWrite()) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
flush();
fclose(fp);
fp = NULL;
}
void BufferedLogStream::threadedWriter() {
while(true) {
if(start != end) {
char* currentend = end;
if(start < currentend) {
fwrite(start, 1, currentend - start, fp);
}
else if(start > currentend) {
if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp);
fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
}
start = currentend;
waitforempty.notify_one();
}
else { //start == end...no work to do
if(!workerthreadkeepalive) return;
boost::unique_lock<boost::mutex> u(workmtx);
waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
}
}
}
bool BufferedLogStream::hasSomethingToWrite() {
return start != end;
}
void BufferedLogStream::writeMessage(string message) {
writeMessage(message.c_str(), message.length());
}
unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
if(end == start) return BUFFER_SIZE-1;
return start - end - 1; //case where start > end
}
void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
memcpy(end, message, length);
end += length;
}
else {
int lengthtoendofbuffer = endofringbuffer - end;
if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
int remainderlength = length - lengthtoendofbuffer;
memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
end = ringbuffer + remainderlength;
}
}
void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
while(hasSomethingToWrite()); {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
fwrite(message, 1, length, fp);
}
else {
//wait until there is enough free space to insert new string
while(getFreeSpaceInBuffer() < length) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
appendStringToBuffer(message, length);
}
waitforwork.notify_one();
}
#if(TEST)
void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
for(int i = 0; i < tb.length; i++) {
tb.message[i] = rand() % 26 + 65;
}
tb.message[tb.length] = '\n';
tb.length++;
tb.message[tb.length] = '\0';
}
void BufferedLogStream::test() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
testbuffer tb;
datatowrite = fopen("orig.txt", "w+b");
for(unsigned int i = 0; i < 7000000; i++) {
if(i % 1000000 == 0) cout << i << endl;
getNextRandomTest(tb);
writeMessage(tb.message, tb.length);
fwrite(tb.message, 1, tb.length, datatowrite);
}
fflush(datatowrite);
fclose(datatowrite);
}
#endif
#if(BENCHMARK)
void BufferedLogStream::initBenchmarkString() {
for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
benchmarkstr[i] = rand() % 26 + 65;
}
benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
}
void BufferedLogStream::runDirectWriteBaseline() {
clock_t starttime = clock();
fp = fopen("BenchMarkBaseline.txt", "w+b");
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
}
fflush(fp);
fclose(fp);
clock_t elapsedtime = clock() - starttime;
cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}
void BufferedLogStream::runBufferedWriteBenchmark() {
clock_t starttime = clock();
openFile("BufferedBenchmark.txt");
cout << "Opend file" << endl;
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
}
cout << "Wrote" << endl;
close();
cout << "Close" << endl;
clock_t elapsedtime = clock() - starttime;
cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}
void BufferedLogStream::runBenchmark() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
initBenchmarkString();
runDirectWriteBaseline();
runBufferedWriteBenchmark();
}
#endif
Update: November 25, 2013
I updated the code below use boost::condition_variables, specifically the wait_for() method as recommended by Evgeny Panasyuk. This avoids unnecessarily checking the same condition over and over again. I am currently seeing the buffered version run in about 1/6th the time as the unbuffered / direct-write version. This is not the ideal case because both cases are limited by the hard disk (in my case a 2010 era SSD). I plan to use the code below in an environment where the hard disk will not be the bottleneck and most if not all the time, the buffer should have space available to accommodate the writeMessage requests. That brings me to my next question. How big should I make the buffer? I don't mind allocating 32 MBs or 64 MB to ensure that it never fills up. The code will be running on systems that can spare that. Intuitively, I feel that it's a bad idea to statically allocate a 32 MB character array. Is it? Anyhow, I expect that when I run the code below for my intended application, the latency of logData() calls will be greatly reduced which will yield a significant reduction in overall processing time.
If anyone sees any way to make the code below better (faster, more robust, leaner, etc), please let me know. I appreciate the feedback. Lazin, how would your approach be faster or more efficient than what I have posted below? I kinda like the idea of just having one buffer and making it large enough so that it practically never fills up. Then I don't have to worry about reading from different buffers. Evgeny Panasyuk, I like the approach of using existing code whenever possible, especially if it's an existing boost library. However, I also don't see how the spcs_queue is more efficient than what I have below. I'd rather deal with one large buffer than many smaller ones and have to worry about splitting splitting my input stream on the input and splicing it back together on the output. Your approach would allow me to offload the formatting from the main thread onto the worker thread. That is a cleaver approach. But I'm not sure yet whether it will save a lot of time and to realize the full benefit, I would have to modify code that I do not own.
//End Update
General solution.
I think you must look at the Naggle algorithm. For one producer and one consumer this would look like this:
This schema will help achieve low latency requirement, single message will be written to disc instantaneously, but large amount of events will be written by large batches for greather throughput.
If your log messages have levels - you can improve this schema a little bit. All error messages have high priority(level) and must be saved on disc immediately (because they are rare but very valuable) but debug and trace messages have low priority and can be buffered to save bandwidth (because they are very frequent but not as valuable as error and info messages). So when you write error
message, you must wait until worker thread is done writing your message (and all messages that are in the same buffer) and then continue, but debug and trace messages can be just written to buffer.
Threading.
Spawning worker thread for each application thread is to costly. You must use single writer thread for each log file. Write buffers must be shared between threads. Each buffer must have two pointers - commit_pointer
and prepare_pointer
. All buffer space between beginning of the buffer and commit_pointer
are available for worker thread. Buffer space between commit_pointer
and prepare_pointer
are currently updated by application threads. Invariant: commit_pointer
<= prepare_pointer
.
Write operations can be performed in two steps.
prepare_pointer
;prepare_pointer
value and len is saved by consumer;commit_pointer
is equal to old prepare_pointer
value that its save in local variable.commit_pointer
= commit_pointer
+ len atomically.To prevent false sharing, len(message) can be rounded to cache line size and all extra space can be filled with spaces.
// pseudocode
void write(const char* message) {
int len = strlen(message); // TODO: round to cache line size
const char* old_prepare_ptr;
// Prepare step
while(1)
{
old_prepare_ptr = prepare_ptr;
if (
CAS(&prepare_ptr,
old_prepare_ptr,
prepare_ptr + len) == old_prepare_ptr
)
break;
// retry if another thread perform prepare op.
}
// Write message
memcpy((void*)old_prepare_ptr, (void*)message, len);
// Commit step
while(1)
{
const char* old_commit_ptr = commit_ptr;
if (
CAS(&commit_ptr,
old_commit_ptr,
old_commit_ptr + len) == old_commit_ptr
)
break;
// retry if another thread commits
}
notify_worker_thread();
}
concurrent_queue<T, Size>
The question that I have is how to make the worker thread go to work as soon as there is work to do and sleep when there is no work.
There is boost::lockfree::spsc_queue
- wait-free single-producer single-consumer queue. It can be configured to have compile-time capacity (the size of the internal ringbuffer).
From what I understand, you want something similar to following configuration:
template<typename T, size_t N>
class concurrent_queue
{
// T can be wrapped into struct with padding in order to avoid false sharing
mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
mutable mutex m;
mutable condition_variable c;
void wait() const
{
unique_lock<mutex> u(m);
c.wait_for(u, chrono::microseconds(1)); // Or whatever period you need.
// Timeout is required, because modification happens not under mutex
// and notification can be lost.
// Another option is just to use sleep/yield, without notifications.
}
void notify() const
{
c.notify_one();
}
public:
void push(const T &t)
{
while(!q.push(t))
wait();
notify();
}
void pop(T &result)
{
while(!q.pop(result))
wait();
notify();
}
};
When there are elements in queue - pop
does not block. And when there is enough space in internal buffer - push
does not block.
concurrent<T>
I want to reduce both formatting and write times as much as possible so I plan to reduce both.
Check out Herb Sutter talk at C++ and Beyond 2012: C++ Concurrency. At page 14 he shows example of concurrent<T>
. Basically it is wrapper around object of type T
which starts separate thread for performing all operations on that object. Usage is:
concurrent<ostream*> x(&cout); // starts thread internally
// ...
// x acts as function object.
// It's function call operator accepts action
// which is performed on wrapped object in separate thread.
int i = 42;
x([i](ostream *out){ *out << "i=" << i; }); // passing lambda as action
You can use similar pattern in order to offload all formatting work to consumer thread.
Otherwise, new buffers are allocated and I want to avoid memory allocation after the buffer stream is constructed.
Above concurrent_queue<T, Size>
example uses fixed-size buffer which is fully contained within queue, and does not imply additional allocations.
However, Herb's concurrent<T>
example uses std::function
to pass action into worker thread. That may incur costly allocation.
std::function
implementations may use Small Object Optimization (and most implementations do) - small function objects are in-place copy-constructed in internal buffer, but there is no guarantee, and for function objects bigger than threshold - heap allocation would happen.
There are several options to avoid this allocation:
Implement std::function
analog with internal buffer large enough to hold target function objects (for example, you can try to modify boost::function
or this version).
Use your own function object which would represent all type of log messages. Basically it would contain just values required to format message. As potentially there are different types of messages, consider to use boost::variant
(which is literary union coupled with type tag) to represent them.
Putting it all together, here is proof-of-concept (using second option):
LIVE DEMO
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/variant.hpp>
#include <condition_variable>
#include <iostream>
#include <cstddef>
#include <thread>
#include <chrono>
#include <mutex>
using namespace std;
/*********************************************/
template<typename T, size_t N>
class concurrent_queue
{
mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
mutable mutex m;
mutable condition_variable c;
void wait() const
{
unique_lock<mutex> u(m);
c.wait_for(u, chrono::microseconds(1));
}
void notify() const
{
c.notify_one();
}
public:
void push(const T &t)
{
while(!q.push(t))
wait();
notify();
}
void pop(T &result)
{
while(!q.pop(result))
wait();
notify();
}
};
/*********************************************/
template<typename T, typename F>
class concurrent
{
typedef boost::optional<F> Job;
mutable concurrent_queue<Job, 16> q; // use custom size
mutable T x;
thread worker;
public:
concurrent(T x)
: x{x}, worker{[this]
{
Job j;
while(true)
{
q.pop(j);
if(!j) break;
(*j)(this->x); // you may need to handle exceptions in some way
}
}}
{}
void operator()(const F &f)
{
q.push(Job{f});
}
~concurrent()
{
q.push(Job{});
worker.join();
}
};
/*********************************************/
struct LogEntry
{
struct Formatter
{
typedef void result_type;
ostream *out;
void operator()(double x) const
{
*out << "floating point: " << x << endl;
}
void operator()(int x) const
{
*out << "integer: " << x << endl;
}
};
boost::variant<int, double> data;
void operator()(ostream *out)
{
boost::apply_visitor(Formatter{out}, data);
}
};
/*********************************************/
int main()
{
concurrent<ostream*, LogEntry> log{&cout};
for(int i=0; i!=1024; ++i)
{
log({i});
log({i/10.});
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With