Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Shared Queue in C++

I just simply get packets from network, and Enqueue them in one thread and then consume this packets (Dequeue) in an other thread.

So i decide to use boost library to make a shared queue based on https://www.quantnet.com/cplusplus-multithreading-boost/

template <typename T>
class SynchronisedQueue
{
private:
    std::queue<T> m_queue;  // Use STL queue to store data
    boost::mutex m_mutex;   // The mutex to synchronise on
    boost::condition_variable m_cond;// The condition to wait for

public:

    // Add data to the queue and notify others
    void Enqueue(const T& data)
    {
        // Acquire lock on the queue
        boost::unique_lock<boost::mutex> lock(m_mutex);

        // Add the data to the queue
        m_queue.push(data);

        // Notify others that data is ready
        m_cond.notify_one();

    } // Lock is automatically released here

    // Get data from the queue. Wait for data if not available
    T Dequeue()
    {

        // Acquire lock on the queue
        boost::unique_lock<boost::mutex> lock(m_mutex);

        // When there is no data, wait till someone fills it.
        // Lock is automatically released in the wait and obtained 
        // again after the wait
        while (m_queue.size()==0) m_cond.wait(lock);

        // Retrieve the data from the queue
        T result=m_queue.front(); m_queue.pop();
        return result;

    } // Lock is automatically released here
};

The problem is , while not getting any data, Dequeue() method blocks my consumer thread, and when i want to terminate consumer thread i can not able to end it or stop it sometimes.

What is the suggested way to end blocking of Dequeue(), so that i can safely terminate the thread that consume packets? Any ideas suggestions?

PS: The site https://www.quantnet.com/cplusplus-multithreading-boost/ use "boost::this_thread::interruption_point();" for stopping consumer thread ... Because of my legacy code structure this is not possible for me...

Based on Answer I update Shared Queue like this:

#include <queue>
 #include <boost/thread.hpp>  

template <typename T>
class SynchronisedQueue
{
public:

    SynchronisedQueue()
    {
        RequestToEnd = false;  
        EnqueueData = true;
    }
    void Enqueue(const T& data)
    {
        boost::unique_lock<boost::mutex> lock(m_mutex);

        if(EnqueueData)
        {
            m_queue.push(data);
            m_cond.notify_one();
        }

    } 


    bool TryDequeue(T& result)
    {
        boost::unique_lock<boost::mutex> lock(m_mutex);

        while (m_queue.empty() && (! RequestToEnd)) 
        { 
            m_cond.wait(lock);
        }

        if( RequestToEnd )
        {
             DoEndActions();
             return false;
        }

        result= m_queue.front(); m_queue.pop();

        return true;
    }

    void StopQueue()
    {
        RequestToEnd =  true;
        Enqueue(NULL);        
    }

    int Size()
    {
        boost::unique_lock<boost::mutex> lock(m_mutex);
        return m_queue.size();

    }

private:

    void DoEndActions()
    {
        EnqueueData = false;

        while (!m_queue.empty())  
        {
            m_queue.pop();
        }
    }



    std::queue<T> m_queue;              // Use STL queue to store data
    boost::mutex m_mutex;               // The mutex to synchronise on
    boost::condition_variable m_cond;            // The condition to wait for

    bool RequestToEnd;
    bool EnqueueData;
};

And Here is my Test Drive:

#include <iostream>
#include <string>

#include "SynchronisedQueue.h"

using namespace std;

SynchronisedQueue<int> MyQueue;

void InsertToQueue()
{
    int i= 0;

    while(true)
    {
        MyQueue.Enqueue(++i);
    }

}

void ConsumeFromQueue()
{
    while(true)
    {
        int number;

        cout << "Now try to dequeue" << endl;

        bool success = MyQueue.TryDequeue(number);

        if(success)
        {

            cout << "value is " << number << endl;

        }

        else
        {
            cout << " queue is stopped" << endl;
            break;

        }
    }


    cout << "Que size is : " << MyQueue.Size() <<  endl;
}



int main()
{

    cout << "Test Started" << endl;

    boost::thread startInsertIntoQueue = boost::thread(InsertToQueue);
    boost::thread consumeFromQueue = boost::thread(ConsumeFromQueue);

    boost::this_thread::sleep(boost::posix_time::seconds(5)); //After 5 seconds

    MyQueue.StopQueue();

    int endMain;

    cin >> endMain;


    return 0;
}

For now it seems to work...Based on new suggestions:

i change Stop Method as:

void StopQueue()
    {
        boost::unique_lock<boost::mutex> lock(m_mutex);
        RequestToEnd =  true;
        m_cond.notify_one();          
    }
like image 240
Novalis Avatar asked Apr 13 '12 10:04

Novalis


2 Answers

2 easy solutions to let the thread end:

  1. send an end message on the queue.
  2. add another condition to the condition variable to command to end

    while(queue.empty() && (! RequestToEnd)) m_cond.wait(lock);
    if (RequestToEnd) { doEndActions(); }
    else { T result=m_queue.front(); m_queue.pop(); return result; }
    
like image 153
stefaanv Avatar answered Oct 24 '22 19:10

stefaanv


First, do you really need to terminate the thread? If not, don't.

If you do have to, then just queue it a suicide pill. I usually send a NULL cast to T. The thread checks T and, if NULL, cleans up, returns and so dies.

Also, you may need to purge the queue first by removing and delete()ing all the items.

like image 22
Martin James Avatar answered Oct 24 '22 19:10

Martin James