Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

thread sync using mutex and condition variable

I'm trying to implement an multi-thread job, a producer and a consumer, and basically what I want to do is, when consumer finishes the data, it notifies the producer so that producer provides new data.

The tricky part is, in my current impl, producer and consumer both notifies each other and waits for each other, I don't know how to implement this part correctly.

For example, see the code below,

mutex m;
condition_variable cv;

vector<int> Q;  // this is the queue the consumer will consume
vector<int> Q_buf;  // this is a buffer Q into which producer will fill new data directly

// consumer
void consume() {
  while (1) {
    if (Q.size() == 0) {  // when consumer finishes data
      unique_lock<mutex> lk(m);
      // how to notify producer to fill up the Q?
      ...
      cv.wait(lk);
    }

    // for-loop to process the elems in Q
    ...
  }
}

// producer
void produce() {
  while (1) {
    // for-loop to fill up Q_buf
    ...

    // once Q_buf is fully filled, wait until consumer asks to give it a full Q
    unique_lock<mutex> lk(m);
    cv.wait(lk);
    Q.swap(Q_buf);  // replace the empty Q with the full Q_buf
    cv.notify_one();
  }
}

I'm not sure this the above code using mutex and condition_variable is the right way to implement my idea, please give me some advice!

like image 923
avocado Avatar asked May 07 '18 14:05

avocado


2 Answers

The code incorrectly assumes that vector<int>::size() and vector<int>::swap() are atomic. They are not.

Also, spurious wakeups must be handled by a while loop (or another cv::wait overload).

Fixes:

mutex m;
condition_variable cv;
vector<int> Q;

// consumer
void consume() {
    while(1) {
        // Get the new elements.
        vector<int> new_elements;
        {
            unique_lock<mutex> lk(m);
            while(Q.empty())
                cv.wait(lk);
            new_elements.swap(Q);
        }
        // for-loop to process the elems in new_elements
    }
}

// producer
void produce() {
    while(1) {
        vector<int> new_elements;
        // for-loop to fill up new_elements

        // publish new_elements
        {
            unique_lock<mutex> lk(m);
            Q.insert(Q.end(), new_elements.begin(), new_elements.end());
            cv.notify_one();
        }
    }
}
like image 137
Maxim Egorushkin Avatar answered Sep 26 '22 14:09

Maxim Egorushkin


Maybe that is close to what you want to achive. I used 2 conditional variables to notify producers and consumers between each other and introduced variable denoting which turn is now:

#include <ctime>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

template<typename T>
class ReaderWriter {
    private:
        std::vector<std::thread> readers;
        std::vector<std::thread> writers;
        std::condition_variable readerCv, writerCv;
        std::queue<T> data;
        std::mutex readerMutex, writerMutex;
        size_t noReaders, noWriters;
        enum class Turn { WRITER_TURN, READER_TURN };
        Turn turn;
        void reader() {
            while (1) {
                {
                    std::unique_lock<std::mutex> lk(readerMutex);    
                    while (turn != Turn::READER_TURN) {
                        readerCv.wait(lk);
                    }
                    std::cout << "Thread : " << std::this_thread::get_id() << " consumed " << data.front() << std::endl;
                    data.pop();
                    if (data.empty()) {
                        turn = Turn::WRITER_TURN;
                        writerCv.notify_one();
                    }
                }
            }
        }

        void writer() {
            while (1) {
                {
                    std::unique_lock<std::mutex> lk(writerMutex);
                    while (turn != Turn::WRITER_TURN) {
                        writerCv.wait(lk);
                    }
                    srand(time(NULL));
                    int random_number = std::rand();
                    data.push(random_number);
                    std::cout << "Thread : " << std::this_thread::get_id() << " produced " << random_number << std::endl;
                    turn = Turn::READER_TURN;
                }
                readerCv.notify_one();
            }
        }

    public:
        ReaderWriter(size_t noReadersArg, size_t noWritersArg) : noReaders(noReadersArg), noWriters(noWritersArg), turn(ReaderWriter::Turn::WRITER_TURN) {
        }

        void run() {
            int noReadersArg = noReaders, noWritersArg = noWriters;
            while (noReadersArg--) {
                readers.emplace_back(&ReaderWriter::reader, this);
            }

            while (noWritersArg--) {
                writers.emplace_back(&ReaderWriter::writer, this);
            }
        }

        ~ReaderWriter() {
            for (auto& r : readers) {
                r.join();
            }
            for (auto& w : writers) {
                w.join();
            }
        }
};

int main() {
    ReaderWriter<int> rw(5, 5);
    rw.run();
}
like image 37
Mateusz Wojtczak Avatar answered Sep 25 '22 14:09

Mateusz Wojtczak