Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C++11 lockfree single producer single consumer: how to avoid busy wait

I'm trying to implement a class that uses two threads: one for the producer and one for the consumer. The current implementation does not use locks:

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>

using Queue =
        boost::lockfree::spsc_queue<
            int,
            boost::lockfree::capacity<1024>>;

class Worker
{
public:
    Worker() : working_(false), done_(false) {}
    ~Worker() {
        done_ = true;    // exit even if the work has not been completed
        worker_.join();
    }

    void enqueue(int value) {
        queue_.push(value);
        if (!working_) {
            working_ = true;
            worker_ = std::thread([this]{ work(); });
        }
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_ = false;
    }

private:
    std::atomic<bool> working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

The application needs to enqueue work items for a certain amount of time and then sleep waiting for an event. This is a minimal main that simulates the behavior:

int main()
{
    Worker w;
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
}

I'm pretty sure that my implementation is bugged: what if the worker thread completes and before executing working_ = false, another enqueue comes? Is it possible to make my code thread safe without using locks?

The solution requires:

  • a fast enqueue
  • the destructor has to quit even if the queue is not empty
  • no busy wait, because there are long period of time in which the worker thread is idle
  • no locks if possible

Edit

I did another implementation of the Worker class, based on your suggestions. Here is my second attempt:

class Worker
{
public:
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false) { } 

    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        if (worker_.joinable())
            worker_.join();
    }

    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set()) {
            if (worker_.joinable())
                worker_.join();
            worker_ = std::thread([this]{ work(); });
        }
        return enqueued;
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_.clear();
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
    }

private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};

I introduced the worker_.join() inside the enqueue method. This can impact the performances, but in very rare cases (when the queue gets empty and before the thread exits, another enqueue comes). The working_ variable is now an atomic_flag that is set in enqueue and cleared in work. The Additional while after working_.clear() is needed because if another value is pushed, before the clear, but after the while, the value is not processed.

Is this implementation correct?

I did some tests and the implementation seems to work.

OT: Is it better to put this as an edit, or an answer?

like image 267
mbrt Avatar asked Jun 09 '14 12:06

mbrt


2 Answers

what if the worker thread completes and before executing working_ = false, another enqueue comes?

Then the value will be pushed to the queue but will not be processed until another value is enqueued after the flag is set. You (or your users) may decide whether that is acceptable. This can be avoided using locks, but they're against your requirements.

The code may fail if the running thread is about to finish and sets working_ = false; but hasn't stopped running before next value is enqueued. In that case your code will call operator= on the running thread which results in a call to std::terminate according to the linked documentation.

Adding worker_.join() before assigning the worker to a new thread should prevent that.

Another problem is that queue_.push may fail if the queue is full because it has a fixed size. Currently you just ignore the case and the value will not be added to the full queue. If you wait for queue to have space, you don't get fast enqueue (in the edge case). You could take the bool returned by push (which tells if it was successful) and return it from enqueue. That way the caller may decide whether it wants to wait or discard the value.

Or use non-fixed size queue. Boost has this to say about that choice:

Can be used to completely disable dynamic memory allocations during push in order to ensure lockfree behavior. If the data structure is configured as fixed-sized, the internal nodes are stored inside an array and they are addressed by array indexing. This limits the possible size of the queue to the number of elements that can be addressed by the index type (usually 2**16-2), but on platforms that lack double-width compare-and-exchange instructions, this is the best way to achieve lock-freedom.

like image 61
eerorika Avatar answered Oct 07 '22 14:10

eerorika


Your worker thread needs more than 2 states.

  • Not running
  • Doing tasks
  • Idle shutdown
  • Shutdown

If you force shut down, it skips idle shutdown. If you run out of tasks, it transitions to idle shutdown. In idle shutdown, it empties the task queue, then goes into shutting down.

Shutdown is set, then you walk off the end of your worker task.

The producer first puts things on the queue. Then it checks the worker state. If Shutdown or Idle shutdown, first join it (and transition it to not running) then launch a new worker. If not running, just launch a new worker.

If the producer wants to launch a new worker, it first makes sure that we are in the not running state (otherwise, logic error). We then transition to the Doing tasks state, and then we launch the worker thread.

If the producer wants to shut down the helper task, it sets the done flag. It then checks the worker state. If it is anything besides not running, it joins it.

This can result in a worker thread that is launched for no good reason.

There are a few cases where the above can block, but there where a few before as well.

Then, we write a formal or semi-formal proof that the above cannot lose messages, because when writing lock free code you aren't done until you have a proof.

like image 36
Yakk - Adam Nevraumont Avatar answered Oct 07 '22 14:10

Yakk - Adam Nevraumont