Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Any obvious problems or improvements for my producer consumer queue

I asked a previous question about producer/consumer code that was overly general (though the answers were certainly helpful). So I've taken the suggestions from an earlier SO question by another author and converted them to C++ and boost. However I'm always a bit concerned about multithreaded code - so if anyone can see any obvious improvements I'd love to hear about them.

#include <pthread.h>
#include <deque>
#include <iostream>

#include "boost/thread.hpp"


class MyQueue
{
protected:
  boost::mutex mutex_;
  boost::condition_variable condition_;
  bool cancel_;
  std::deque<int> data_;

public:
  MyQueue() : mutex_(), condition_(), cancel_(false), data_()
  {
  }

  struct Canceled{};

  void push( int i )
  {
     boost::lock_guard<boost::mutex> l(mutex_);
     if(cancel_) throw Canceled();
     data_.push_back(i);
     condition_.notify_all();
  }

  void pop( int & i )
  {
     boost::unique_lock<boost::mutex> l(mutex_);
     while(! cancel_ && data_.size()==0 )
     {
        condition_.wait( l );
     }
     if(cancel_) throw Canceled();

     assert( data_.size() != 0 );
     i = data_.front();
     data_.pop_front();
  }

  void cancel()
  {
     boost::lock_guard<boost::mutex> l(mutex_);
     if( cancel_) throw Canceled();
     cancel_ = true;
     condition_.notify_all();
  }
};


boost::mutex iomutex;

void producer( MyQueue * q, const std::string & name )
try
{
  for(unsigned int i=0 ; i<20; ++i)
  {
    q->push( i );
    boost::lock_guard<boost::mutex> l(iomutex);
    std::cout<<name<<"  PRODUCED "<<i<<std::endl;
  }

  sleep(1);
  q->cancel();
  {
    boost::lock_guard<boost::mutex> l(iomutex);
    std::cout<<name<<"  PRODUCER EXITING NORMALLY"<<std::endl;
  }
}
catch( MyQueue::Canceled & c )
{
  boost::lock_guard<boost::mutex> l(iomutex);
  std::cout<<name<<"  PRODUCER CANCLED "<<std::endl;
}

void consumer( MyQueue * q, const std::string & name )
try
{
  while(true)
  {
    int i;
    q->pop( i );
    boost::lock_guard<boost::mutex> l(iomutex);
    std::cout<<name<<"  CONSUMED "<<i<<std::endl;
  }
}
catch( MyQueue::Canceled & c )
{
  boost::lock_guard<boost::mutex> l(iomutex);
  std::cout<<name<<"  CONSUMER CANCLED "<<std::endl;
}

int main()
{
  MyQueue q;
  boost::thread pr1( producer, &q, "pro1");
  boost::thread pr2( producer, &q, "pro2");
  boost::thread cons1( consumer, &q, "con1");
  boost::thread cons2( consumer, &q, "con2");

  pr1.join();
  pr2.join();
  cons1.join();
  cons2.join();
}

UPDATE: I ended up using a modified version of Anthony Williams' concurrent queue. My modified version can be found here.

like image 760
Michael Anderson Avatar asked Jul 19 '10 06:07

Michael Anderson


People also ask

What are the problems in the producer-consumer problem?

Producer-Consumer problem is a classical synchronization problem in the operating system. With the presence of more than one process and limited resources in the system the synchronization problem arises. If one resource is shared between more than one process at the same time then it can lead to data inconsistency.

What is producer-consumer problem explain with example?

The producer consumer problem is a synchronization problem. There is a fixed size buffer and the producer produces items and enters them into the buffer. The consumer removes the items from the buffer and consumes them.

What is a producer-consumer queue?

Producer-consumer queues are one of the most fundamental components in concurrent systems, they represent means to transfer data/messages/tasks/transactions between threads/stages/agents.

How do you implement producer-consumer problem?

Solution: The producer is to either go to sleep or discard data if the buffer is full. The next time the consumer removes an item from the buffer, it notifies the producer, who starts to fill the buffer again. In the same manner, the consumer can go to sleep if it finds the buffer to be empty.


1 Answers

If you're worried about potential pitfalls in your implementation, you can try using Anthony Williams' (maintainer of the Boost.Thread library) excellent thread-safe, multiple-producer, multiple-consumer queue.

like image 89
tmatth Avatar answered Sep 22 '22 13:09

tmatth