My process reads from a single queue tasks that need to be sent to several destinations. We need to maintain order between the tasks (ie task that arrived in the queue at 00:00 needs to be sent before the task that arrived at 00:01) therefore we cannot use thread pool. Order needs to be maintained per destination.
One solution is to create a dedicated thread per destination. The main thread reads the task from the queue and depending on the destination finds the correct thread.
This solution has a problem: if a worker thread is busy, the master thread would remain blocked, making the system slow. What I need is a new queue per thread. The master thread shares the resources to the queues and the worker thread reads the new queues for incoming messages...
I would like to share my thought with the SO community, and I am searching for a C/C++ solution close to me description. Is there a library that implements such model?
The design you want is fairly straightforward; I think you can probably write the code you need and get it working in an hour or two. Looking for a 3rd party library to implement this is probably overkill (unless I am misunderstanding the problem).
In particular, for each 'worker' thread, you need a FIFO data structure (e.g. std::queue), a Mutex, and a mechanism that the 'master' thread can use to signal the thread to wake up and check the data structure for new messages (e.g. a condition variable, or a semaphore, or even a socketpair that the worker blocks on reading, and the master can send a byte on to wake the worker up).
Then to send a task to a particular worker thread, the master would do something like this (pseudocode):
struct WorkerThreadData & workerThread = _workerThreads[threadIndexIWantToSendTo];
workerThread.m_mutex.Lock();
workerThread.m_incomingTasksList.push_back(theNewTaskObject);
workerThread.m_mutex.Unlock();
workerThread.m_signalMechanism.SignalThreadToWakeUp(); // make sure the worker looks at the task list!
... and each worker thread would have an event loop like this:
struct WorkerThreadData & myData = _workerThreads[myWorkerIndex];
TaskObject * taskObject;
while(1)
{
myData.m_signalMechanism.WaitForSignal(); // block until the main thread wakes me up
myData.m_mutex.Lock();
taskObject = (myData.m_incomingTasks.length() > 0) ? myData.m_incomingTasks.pop_front() : NULL;
myData.m_mutex.Unlock();
if (taskObject)
{
taskObject->DoTheWork();
delete taskObject;
}
}
This will never block the master thread (for any significant amount of time), since the Mutex is only held very briefly by anyone. In particular, the worker threads are not holding the mutex while they are working on a task object.
The "need to maintain order" all-but-directly states that you're going to be executing the tasks serially no matter how many threads you have. That being the case, you're probably best off with just one thread servicing the requests.
You could gain something if the requirement is a bit looser than that -- for example, if all the tasks for one destination need to remain in order, but there's no ordering requirement for tasks with different destinations. If this is the case, then your solution of a master queue sending tasks to an input queue for each individual thread sounds like quite a good one.
Edit:
Specifying the number of threads/mutexes dynamically is pretty easy. For example, to take the number from the command line, you could do something on this order (leaving out error and sanity checking for the moment):
std::vector<pthread_t> threads;
int num_threads = atoi(argv[1]);
threads.resize(num_threads);
for (int i=0; i<num_threads; i++)
pthread_create(&threads[i], NULL, thread_routine, NULL);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With