Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C++ Low-Latency Threaded Asynchronous Buffered Stream (intended for logging) – Boost

Question:

3 while loops below contain code that has been commented out. I search for ("TAG1", "TAG2", and "TAG3") for easy identification. I simply want the while loops to wait on the condition tested to become true before proceeding while minimizing CPU resources as much as possible. I first tried using Boost condition variables, but there's a race condition. Putting the thread to sleep for 'x' microseconds is inefficient because there is no way to precisely time the wakeup. Finally, boost::this_thread::yield() does not seem to do anything. Probably because I only have 2 active threads on a dual-core system. Specifically, how can I make the three tagged areas below run more efficiently while introducing as little unnecessary blocking as possible.

BACKGROUND

Objective:

I have an application that logs a lot of data. After profiling, I found that much time is consumed on the logging operations (logging text or binary to a file on the local hard disk). My objective is to reduce the latency on logData calls by replacing non-threaded direct write calls with calls to a threaded buffered stream logger.

Options Explored:

  • Upgrade 2005-era slow hard disk to SSD...possible. Cost is not prohibitive...but involves a lot of work... more than 200 computers would have to be upgraded...
  • Boost ASIO...I don't need all the proactor / networking overhead, looking for something simpler and more light-weight.

Design:

  • Producer and consumer thread pattern, the application writes data into a buffer and a background thread then writes it to disk sometime later. So the ultimate goal is to have the writeMessage function called by the application layer return as fast as possible while data is correctly / completely logged to the log file in a FIFO order sometime later.
  • Only one application thread, only one writer thread.
  • Based on ring buffer. The reason for this decision is to use as few locks as possible and ideally...and please correct me if I'm wrong...I don't think I need any.
  • Buffer is a statically-allocated character array, but could move it to the heap if needed / desired for performance reasons.
  • Buffer has a start pointer that points to the next character that should be written to the file. Buffer has an end pointer that points to the array index after the last character to be written to the file. The end pointer NEVER passes the start pointer. If a message comes in that is larger than the buffer, then the writer waits until the buffer is emptied and writes the new message to the file directly without putting the over-sized message in the buffer (once the buffer is emptied, the worker thread won't be writing anything so no contention).
  • The writer (worker thread) only updates the ring buffer's start pointer.
  • The main (application thread) only updates the ring buffer's end pointer, and again, it only inserts new data into the buffer when there is available space...otherwise it either waits for space in the buffer to become available or writes directly as described above.
  • The worker thread continuously checks to see if there is data to be written (indicated by the case when the buffer start pointer != buffer end pointer). If there is no data to be written, the worker thread should ideally go to sleep and wake up once the application thread has inserted something into the buffer (and changed the buffer's end pointer such that it no longer points to the same index as the start pointer). What I have below involves while loops continuously checking that condition. It is a very bad / inefficient way of waiting on the buffer.

Results:

  • On my 2009-era dual-core laptop with SSD, I see that the total write time of the threaded / buffered benchmark vs. direct write is about 1 : 6 (0.609 sec vs. 0.095 sec), but highly variable. Often the buffered write benchmark is actually slower than direct write. I believe that the variability is due to the poor implementation of waiting for space to free up in the buffer, waiting for the buffer to empty, and having the worker-thread wait for work to become available. I have measured that some of the while loops consume over 10000 cycles and I suspect that those cycles are actually competing for hardware resources that the other thread (worker or application) requires to finish the computation being waited on.
  • Output seems to check out. With TEST mode enabled and a small buffer size of 10 as a stress test, I diffed hundreds of MBs of output and found it to equal the input.

Compiles with current version of Boost (1.55)

Header

    #ifndef BufferedLogStream_h
    #define BufferedLogStream_h

    #include <stdio.h>
    #include <iostream>
    #include <iostream>
    #include <cstdlib>
    #include "boost\chrono\chrono.hpp"
    #include "boost\thread\thread.hpp"
    #include "boost\thread\locks.hpp"
    #include "boost\thread\mutex.hpp"
    #include "boost\thread\condition_variable.hpp"
    #include <time.h>

    using namespace std;

    #define BENCHMARK_STR_SIZE 128
    #define NUM_BENCHMARK_WRITES 524288
    #define TEST 0
    #define BENCHMARK 1
    #define WORKER_LOOP_WAIT_MICROSEC 20
    #define MAIN_LOOP_WAIT_MICROSEC 10

    #if(TEST)
    #define BUFFER_SIZE 10 
    #else 
    #define BUFFER_SIZE 33554432 //4 MB
    #endif

    class BufferedLogStream {
        public:
            BufferedLogStream();
            void openFile(char* filename);
            void flush();
            void close();
            inline void writeMessage(const char* message, unsigned int length);
            void writeMessage(string message);
            bool operator() () { return start != end; }

        private:
            void threadedWriter();
            inline bool hasSomethingToWrite();
            inline unsigned int getFreeSpaceInBuffer();
            void appendStringToBuffer(const char* message, unsigned int length);

            FILE* fp;
            char* start;
            char* end;
            char* endofringbuffer;
            char ringbuffer[BUFFER_SIZE];
            bool workerthreadkeepalive;
            boost::mutex mtx;
            boost::condition_variable waitforempty;
            boost::mutex workmtx;
            boost::condition_variable waitforwork;

            #if(TEST)
            struct testbuffer {
                int length;
                char message[BUFFER_SIZE * 2];
            };

            public:
                void test();

            private:
                void getNextRandomTest(testbuffer &tb);
                FILE* datatowrite;
            #endif

        #if(BENCHMARK)
            public:
                void runBenchmark();

            private:
                void initBenchmarkString();
                void runDirectWriteBaseline();
                void runBufferedWriteBenchmark();

                char benchmarkstr[BENCHMARK_STR_SIZE];
        #endif
    };

    #if(TEST)
    int main() {
        BufferedLogStream* bl = new BufferedLogStream();
        bl->openFile("replicated.txt");
        bl->test();
        bl->close();
        cout << "Done" << endl;
        cin.get();
        return 0;
    }
    #endif

    #if(BENCHMARK)
    int main() {
        BufferedLogStream* bl = new BufferedLogStream();
        bl->runBenchmark();
        cout << "Done" << endl;
        cin.get();
        return 0;
    }
    #endif //for benchmark

    #endif

Implementation

    #include "BufferedLogStream.h"

    BufferedLogStream::BufferedLogStream() {
        fp = NULL;
        start = ringbuffer;
        end = ringbuffer;
        endofringbuffer = ringbuffer + BUFFER_SIZE;
        workerthreadkeepalive = true;
    }

    void BufferedLogStream::openFile(char* filename) {
        if(fp) close();
        workerthreadkeepalive = true;
        boost::thread t2(&BufferedLogStream::threadedWriter, this);
        fp = fopen(filename, "w+b");
    }

    void BufferedLogStream::flush() {
        fflush(fp); 
    }

    void BufferedLogStream::close() {
        workerthreadkeepalive = false;
        if(!fp) return;
        while(hasSomethingToWrite()) {
            boost::unique_lock<boost::mutex> u(mtx);
            waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
        }
        flush();        
        fclose(fp);             
        fp = NULL;          
    }

    void BufferedLogStream::threadedWriter() {
        while(true) {
            if(start != end) {
                char* currentend = end;
                if(start < currentend) {
                    fwrite(start, 1, currentend - start, fp);
                }
                else if(start > currentend) {
                    if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp); 
                    fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
                }
                start = currentend;
                waitforempty.notify_one();
            }
            else { //start == end...no work to do
                if(!workerthreadkeepalive) return;
                boost::unique_lock<boost::mutex> u(workmtx);
                waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
            }
        }
    }

    bool BufferedLogStream::hasSomethingToWrite() {
        return start != end;
    }

    void BufferedLogStream::writeMessage(string message) {
        writeMessage(message.c_str(), message.length());
    }

    unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
        if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
        if(end == start) return BUFFER_SIZE-1;
        return start - end - 1; //case where start > end
    }

    void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
        if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
            memcpy(end, message, length);
            end += length;
        }
        else {
            int lengthtoendofbuffer = endofringbuffer - end;
            if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
            int remainderlength =  length - lengthtoendofbuffer;
            memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
            end = ringbuffer + remainderlength;
        }
    }

    void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
        if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
            while(hasSomethingToWrite()); {
                boost::unique_lock<boost::mutex> u(mtx);
                waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
            }
            fwrite(message, 1, length, fp);
        }
        else {
            //wait until there is enough free space to insert new string
            while(getFreeSpaceInBuffer() < length) {
                boost::unique_lock<boost::mutex> u(mtx);
                waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
            }
            appendStringToBuffer(message, length);
        }
        waitforwork.notify_one();
    }

    #if(TEST)
        void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
            tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
            for(int i = 0; i < tb.length; i++) {
                tb.message[i] = rand() % 26 + 65;
            }
            tb.message[tb.length] = '\n';
            tb.length++;
            tb.message[tb.length] = '\0';
        }

        void BufferedLogStream::test() {
            cout << "Buffer size is: " << BUFFER_SIZE << endl;
            testbuffer tb;
            datatowrite = fopen("orig.txt", "w+b");
            for(unsigned int i = 0; i < 7000000; i++) {
                if(i % 1000000 == 0) cout << i << endl;
                getNextRandomTest(tb);
                writeMessage(tb.message, tb.length);
                fwrite(tb.message, 1, tb.length, datatowrite);
            }       
            fflush(datatowrite);
            fclose(datatowrite);
        }
    #endif

    #if(BENCHMARK) 
        void BufferedLogStream::initBenchmarkString() {
            for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
                benchmarkstr[i] = rand() % 26 + 65;
            }
            benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
        }

        void BufferedLogStream::runDirectWriteBaseline() {
            clock_t starttime = clock();
            fp = fopen("BenchMarkBaseline.txt", "w+b");
            for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
            }   
            fflush(fp);
            fclose(fp);
            clock_t elapsedtime = clock() - starttime;
            cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
        }

        void BufferedLogStream::runBufferedWriteBenchmark() {
            clock_t starttime = clock();
            openFile("BufferedBenchmark.txt");
            cout << "Opend file" << endl;
            for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
            }   
            cout << "Wrote" << endl;
            close();
            cout << "Close" << endl;
            clock_t elapsedtime = clock() - starttime;
            cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
        }

        void BufferedLogStream::runBenchmark() {
            cout << "Buffer size is: " << BUFFER_SIZE << endl;
            initBenchmarkString();
            runDirectWriteBaseline();
            runBufferedWriteBenchmark();
        }
    #endif

Update: November 25, 2013

I updated the code below use boost::condition_variables, specifically the wait_for() method as recommended by Evgeny Panasyuk. This avoids unnecessarily checking the same condition over and over again. I am currently seeing the buffered version run in about 1/6th the time as the unbuffered / direct-write version. This is not the ideal case because both cases are limited by the hard disk (in my case a 2010 era SSD). I plan to use the code below in an environment where the hard disk will not be the bottleneck and most if not all the time, the buffer should have space available to accommodate the writeMessage requests. That brings me to my next question. How big should I make the buffer? I don't mind allocating 32 MBs or 64 MB to ensure that it never fills up. The code will be running on systems that can spare that. Intuitively, I feel that it's a bad idea to statically allocate a 32 MB character array. Is it? Anyhow, I expect that when I run the code below for my intended application, the latency of logData() calls will be greatly reduced which will yield a significant reduction in overall processing time.

If anyone sees any way to make the code below better (faster, more robust, leaner, etc), please let me know. I appreciate the feedback. Lazin, how would your approach be faster or more efficient than what I have posted below? I kinda like the idea of just having one buffer and making it large enough so that it practically never fills up. Then I don't have to worry about reading from different buffers. Evgeny Panasyuk, I like the approach of using existing code whenever possible, especially if it's an existing boost library. However, I also don't see how the spcs_queue is more efficient than what I have below. I'd rather deal with one large buffer than many smaller ones and have to worry about splitting splitting my input stream on the input and splicing it back together on the output. Your approach would allow me to offload the formatting from the main thread onto the worker thread. That is a cleaver approach. But I'm not sure yet whether it will save a lot of time and to realize the full benefit, I would have to modify code that I do not own.

//End Update

like image 894
486DX2-66 Avatar asked Nov 25 '13 07:11

486DX2-66


2 Answers

General solution.

I think you must look at the Naggle algorithm. For one producer and one consumer this would look like this:

  • At the beginning buffer is empty, worker thread is idle and waiting for the events.
  • Producer writes data to the buffer and notifies worker thread.
  • Worker thread woke up and start the write operation.
  • Producer tries to write another message, but buffer is used by worker, so producer allocates another buffer and writes message to it.
  • Producer tries to write another message, I/O still in progress so producer writes message to previously allocated buffer.
  • Worker thread done writing buffer to file and sees that there is another buffer with data so it grabs it and starts to write.
  • The very first buffer is used by producer to write all consecutive messages, until second write operation in progress.

This schema will help achieve low latency requirement, single message will be written to disc instantaneously, but large amount of events will be written by large batches for greather throughput.

If your log messages have levels - you can improve this schema a little bit. All error messages have high priority(level) and must be saved on disc immediately (because they are rare but very valuable) but debug and trace messages have low priority and can be buffered to save bandwidth (because they are very frequent but not as valuable as error and info messages). So when you write error message, you must wait until worker thread is done writing your message (and all messages that are in the same buffer) and then continue, but debug and trace messages can be just written to buffer.

Threading.

Spawning worker thread for each application thread is to costly. You must use single writer thread for each log file. Write buffers must be shared between threads. Each buffer must have two pointers - commit_pointer and prepare_pointer. All buffer space between beginning of the buffer and commit_pointer are available for worker thread. Buffer space between commit_pointer and prepare_pointer are currently updated by application threads. Invariant: commit_pointer <= prepare_pointer.

Write operations can be performed in two steps.

  1. Prepare write. This operation reserves space in a buffer.
    • Producer calculates len(message) and atomically updates prepare_pointer;
    • Old prepare_pointer value and len is saved by consumer;
  2. Commit write.
    • Producer writes message at the beginning of the reserved buffer space (old prepare_pointer value).
    • Producer busy-waits until commit_pointer is equal to old prepare_pointer value that its save in local variable.
    • Producer commit write operation by doing commit_pointer = commit_pointer + len atomically.

To prevent false sharing, len(message) can be rounded to cache line size and all extra space can be filled with spaces.

// pseudocode
void write(const char* message) {
    int len = strlen(message);  // TODO: round to cache line size
    const char* old_prepare_ptr;
    // Prepare step
    while(1) 
    {
        old_prepare_ptr = prepare_ptr;
        if (
            CAS(&prepare_ptr, 
                 old_prepare_ptr, 
                 prepare_ptr + len) == old_prepare_ptr
            )
            break;
        // retry if another thread perform prepare op.
    }
    // Write message
    memcpy((void*)old_prepare_ptr, (void*)message, len);
    // Commit step
    while(1)
    {
        const char* old_commit_ptr = commit_ptr;
        if (
             CAS(&commit_ptr, 
                  old_commit_ptr, 
                  old_commit_ptr + len) == old_commit_ptr
            )
            break;
        // retry if another thread commits
    }
    notify_worker_thread();
}
like image 66
Evgeny Lazin Avatar answered Oct 02 '22 23:10

Evgeny Lazin


concurrent_queue<T, Size>

The question that I have is how to make the worker thread go to work as soon as there is work to do and sleep when there is no work.

There is boost::lockfree::spsc_queue - wait-free single-producer single-consumer queue. It can be configured to have compile-time capacity (the size of the internal ringbuffer).

From what I understand, you want something similar to following configuration:

template<typename T, size_t N>
class concurrent_queue
{
    // T can be wrapped into struct with padding in order to avoid false sharing
    mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
    mutable mutex m;
    mutable condition_variable c;

    void wait() const
    {
        unique_lock<mutex> u(m);
        c.wait_for(u, chrono::microseconds(1)); // Or whatever period you need.
        // Timeout is required, because modification happens not under mutex
        //     and notification can be lost.
        // Another option is just to use sleep/yield, without notifications.
    }
    void notify() const
    {
        c.notify_one();
    }
public:
    void push(const T &t)
    {
        while(!q.push(t))
            wait();
        notify();
    }
    void pop(T &result)
    {
        while(!q.pop(result))
            wait();
        notify();
    }
};

When there are elements in queue - pop does not block. And when there is enough space in internal buffer - push does not block.


concurrent<T>

I want to reduce both formatting and write times as much as possible so I plan to reduce both.

Check out Herb Sutter talk at C++ and Beyond 2012: C++ Concurrency. At page 14 he shows example of concurrent<T>. Basically it is wrapper around object of type T which starts separate thread for performing all operations on that object. Usage is:

concurrent<ostream*> x(&cout); // starts thread internally
// ...
// x acts as function object.
// It's function call operator accepts action
//   which is performed on wrapped object in separate thread.
int i = 42;
x([i](ostream *out){ *out << "i=" << i; }); // passing lambda as action

You can use similar pattern in order to offload all formatting work to consumer thread.


Small Object Optimization

Otherwise, new buffers are allocated and I want to avoid memory allocation after the buffer stream is constructed.

Above concurrent_queue<T, Size> example uses fixed-size buffer which is fully contained within queue, and does not imply additional allocations.

However, Herb's concurrent<T> example uses std::function to pass action into worker thread. That may incur costly allocation.

std::function implementations may use Small Object Optimization (and most implementations do) - small function objects are in-place copy-constructed in internal buffer, but there is no guarantee, and for function objects bigger than threshold - heap allocation would happen.

There are several options to avoid this allocation:

  1. Implement std::function analog with internal buffer large enough to hold target function objects (for example, you can try to modify boost::function or this version).

  2. Use your own function object which would represent all type of log messages. Basically it would contain just values required to format message. As potentially there are different types of messages, consider to use boost::variant (which is literary union coupled with type tag) to represent them.

Putting it all together, here is proof-of-concept (using second option):

LIVE DEMO

#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/variant.hpp>

#include <condition_variable>
#include <iostream>
#include <cstddef>
#include <thread>
#include <chrono>
#include <mutex>

using namespace std;

/*********************************************/
template<typename T, size_t N>
class concurrent_queue
{
    mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
    mutable mutex m;
    mutable condition_variable c;

    void wait() const
    {
        unique_lock<mutex> u(m);
        c.wait_for(u, chrono::microseconds(1));
    }
    void notify() const
    {
        c.notify_one();
    }
public:
    void push(const T &t)
    {
        while(!q.push(t))
            wait();
        notify();
    }
    void pop(T &result)
    {
        while(!q.pop(result))
            wait();
        notify();
    }
};

/*********************************************/
template<typename T, typename F>
class concurrent
{
    typedef boost::optional<F> Job;

    mutable concurrent_queue<Job, 16> q; // use custom size
    mutable T x;
    thread worker;

public:
    concurrent(T x)
        : x{x}, worker{[this]
        {
            Job j;
            while(true)
            {
                q.pop(j);
                if(!j) break;
                (*j)(this->x); // you may need to handle exceptions in some way
            }
        }}
    {}
    void operator()(const F &f)
    {
        q.push(Job{f});
    }
    ~concurrent()
    {
        q.push(Job{});
        worker.join();
    }
};

/*********************************************/
struct LogEntry
{
    struct Formatter
    {
        typedef void result_type;
        ostream *out;

        void operator()(double x) const
        {
            *out << "floating point: " << x << endl;
        }
        void operator()(int x) const
        {
            *out << "integer: " << x << endl;
        }
    };
    boost::variant<int, double> data;

    void operator()(ostream *out)
    {
        boost::apply_visitor(Formatter{out}, data);
    }
};

/*********************************************/
int main()
{
    concurrent<ostream*, LogEntry> log{&cout};

    for(int i=0; i!=1024; ++i)
    {
        log({i});
        log({i/10.});
    }
}
like image 37
Evgeny Panasyuk Avatar answered Oct 03 '22 00:10

Evgeny Panasyuk