I just simply get packets from network, and Enqueue them in one thread and then consume this packets (Dequeue) in an other thread.
So i decide to use boost library to make a shared queue based on https://www.quantnet.com/cplusplus-multithreading-boost/
template <typename T>
class SynchronisedQueue
{
private:
std::queue<T> m_queue; // Use STL queue to store data
boost::mutex m_mutex; // The mutex to synchronise on
boost::condition_variable m_cond;// The condition to wait for
public:
// Add data to the queue and notify others
void Enqueue(const T& data)
{
// Acquire lock on the queue
boost::unique_lock<boost::mutex> lock(m_mutex);
// Add the data to the queue
m_queue.push(data);
// Notify others that data is ready
m_cond.notify_one();
} // Lock is automatically released here
// Get data from the queue. Wait for data if not available
T Dequeue()
{
// Acquire lock on the queue
boost::unique_lock<boost::mutex> lock(m_mutex);
// When there is no data, wait till someone fills it.
// Lock is automatically released in the wait and obtained
// again after the wait
while (m_queue.size()==0) m_cond.wait(lock);
// Retrieve the data from the queue
T result=m_queue.front(); m_queue.pop();
return result;
} // Lock is automatically released here
};
The problem is , while not getting any data, Dequeue() method blocks my consumer thread, and when i want to terminate consumer thread i can not able to end it or stop it sometimes.
What is the suggested way to end blocking of Dequeue(), so that i can safely terminate the thread that consume packets? Any ideas suggestions?
PS: The site https://www.quantnet.com/cplusplus-multithreading-boost/ use "boost::this_thread::interruption_point();" for stopping consumer thread ... Because of my legacy code structure this is not possible for me...
Based on Answer I update Shared Queue like this:
#include <queue>
#include <boost/thread.hpp>
template <typename T>
class SynchronisedQueue
{
public:
SynchronisedQueue()
{
RequestToEnd = false;
EnqueueData = true;
}
void Enqueue(const T& data)
{
boost::unique_lock<boost::mutex> lock(m_mutex);
if(EnqueueData)
{
m_queue.push(data);
m_cond.notify_one();
}
}
bool TryDequeue(T& result)
{
boost::unique_lock<boost::mutex> lock(m_mutex);
while (m_queue.empty() && (! RequestToEnd))
{
m_cond.wait(lock);
}
if( RequestToEnd )
{
DoEndActions();
return false;
}
result= m_queue.front(); m_queue.pop();
return true;
}
void StopQueue()
{
RequestToEnd = true;
Enqueue(NULL);
}
int Size()
{
boost::unique_lock<boost::mutex> lock(m_mutex);
return m_queue.size();
}
private:
void DoEndActions()
{
EnqueueData = false;
while (!m_queue.empty())
{
m_queue.pop();
}
}
std::queue<T> m_queue; // Use STL queue to store data
boost::mutex m_mutex; // The mutex to synchronise on
boost::condition_variable m_cond; // The condition to wait for
bool RequestToEnd;
bool EnqueueData;
};
And Here is my Test Drive:
#include <iostream>
#include <string>
#include "SynchronisedQueue.h"
using namespace std;
SynchronisedQueue<int> MyQueue;
void InsertToQueue()
{
int i= 0;
while(true)
{
MyQueue.Enqueue(++i);
}
}
void ConsumeFromQueue()
{
while(true)
{
int number;
cout << "Now try to dequeue" << endl;
bool success = MyQueue.TryDequeue(number);
if(success)
{
cout << "value is " << number << endl;
}
else
{
cout << " queue is stopped" << endl;
break;
}
}
cout << "Que size is : " << MyQueue.Size() << endl;
}
int main()
{
cout << "Test Started" << endl;
boost::thread startInsertIntoQueue = boost::thread(InsertToQueue);
boost::thread consumeFromQueue = boost::thread(ConsumeFromQueue);
boost::this_thread::sleep(boost::posix_time::seconds(5)); //After 5 seconds
MyQueue.StopQueue();
int endMain;
cin >> endMain;
return 0;
}
For now it seems to work...Based on new suggestions:
i change Stop Method as:
void StopQueue()
{
boost::unique_lock<boost::mutex> lock(m_mutex);
RequestToEnd = true;
m_cond.notify_one();
}
2 easy solutions to let the thread end:
add another condition to the condition variable to command to end
while(queue.empty() && (! RequestToEnd)) m_cond.wait(lock);
if (RequestToEnd) { doEndActions(); }
else { T result=m_queue.front(); m_queue.pop(); return result; }
First, do you really need to terminate the thread? If not, don't.
If you do have to, then just queue it a suicide pill. I usually send a NULL cast to T. The thread checks T and, if NULL, cleans up, returns and so dies.
Also, you may need to purge the queue first by removing and delete()ing all the items.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With