Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

consumer/producer in c++

This is a classic c/p problem where some threads produce data while other read the data. Both the producer and consumers are sharing a const sized buffer. If the buffer is empty then the consumers have to wait and if it is full then the producer has to wait. I am using semaphores to keep track of full or empty queues. The producer is going to decrement free spots semaphore, add value, and increment filled slots semaphore. So I am trying to implement a program that gets some numbers from the generator function, and then prints out the average of the numbers. By treating this as a producer-consumer problem, I am trying to save some time in the execution of the program. The generateNumber function causes some delay in the process so I want to create a number of threads that generate numbers, and put them into a queue. Then the "main thread" which is running the main function has to read from the queue and find the sum and then average. So here is what I have so far:

#include <cstdio> 
#include <cstdlib>
#include <time.h>
#include "Thread.h" 
#include <queue> 

int generateNumber() {
    int delayms = rand() / (float) RAND_MAX * 400.f + 200;
    int result = rand() / (float) RAND_MAX * 20;
    struct timespec ts;
    ts.tv_sec = 0;
    ts.tv_nsec = delayms * 1000000;
    nanosleep(&ts, NULL);
    return result; }


struct threadarg {
    Semaphore filled(0);
    Semaphore empty(n);
    std::queue<int> q; };


void* threadfunc(void *arg) {
    threadarg *targp = (threadarg *) arg;
    threadarg &targ = *targp;
    while (targ.empty.value() != 0) {
        int val = generateNumber();
        targ.empty.dec(); 
        q.push_back(val);
        targ.filled.inc(); }
}
int main(int argc, char **argv) {
    Thread consumer, producer;
    // read the command line arguments
    if (argc != 2) {
        printf("usage: %s [nums to average]\n", argv[0]);
        exit(1); }
    int n = atoi(argv[1]);
    // Seed random number generator
    srand(time(NULL));
}

I am a bit confused now because I am not sure how to create multiple producer threads that are generating numbers (if q is not full) while the consumer is reading from the queue (that is if q is not empty). I am not sure what to put in the main to implment it. also in "Thread.h", you can create a thread, a mutex, or a semaphore. The thread has the methods .run(threadFunc, arg), .join(), etc. A mutex can be locked or unlocked. The semaphore methods have all been used in my code.

like image 514
Dan P. Avatar asked Feb 21 '23 09:02

Dan P.


1 Answers

Your queue is not synchronized, so multiple producers could call push_back at the same time, or at the same time the consumer is calling pop_front ... this will break.

The simple approach to making this work is to use a thread-safe queue, which can be a wrapper around the std::queue you already have, plus a mutex.

You can start by adding a mutex, and locking/unlocking it around each call you forward to std::queue - for a single consumer that should be sufficient, for multiple consumers you'd need to fuse front() and pop_front() into a single synchronized call.

To let the consumer block while the queue is empty, you can add a condition variable to your wrapper.

That should be enough that you can find the answer online - sample code below.


template <typename T> class SynchronizedQueue
{
    std::queue<T> queue_;
    std::mutex mutex_;
    std::condition_variable condvar_;

    typedef std::lock_guard<std::mutex> lock;
    typedef std::unique_lock<std::mutex> ulock;

public:
    void push(T const &val)
    {
        lock l(mutex_); // prevents multiple pushes corrupting queue_
        bool wake = queue_.empty(); // we may need to wake consumer
        queue_.push(val);
        if (wake) condvar_.notify_one();
    }

    T pop()
    {
        ulock u(mutex_);
        while (queue_.empty())
            condvar_.wait(u);
        // now queue_ is non-empty and we still have the lock
        T retval = queue_.front();
        queue_.pop();
        return retval;
    }
};

Replace std::mutex et al with whatever primitives your "Thread.h" gives you.

like image 140
Useless Avatar answered Mar 02 '23 19:03

Useless