Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multi-threaded C++ Message Passing

I am tasked to modify a synchronous C program so that it can run in parallel. The goal is to have it be as portable as possible as it is an open source program that many people use. Because of this, I thought it would be best to wrap the program in a C++ layer so that I could take advantage of the portable boost libraries. I have already done this and everything seems to work as expected.

The problem I am having is deciding on what is the best approach to pass messages between the threads. Luckily, the architecture of the program is that of a multiple producer and single consumer. Even better, the order of the messages is not important. I have read that single-producer/single-consumer (SPSC) queues would benefit from this architecture. Those experienced with multi-threaded programming have any advice? I'm quite new to this stuff. Also any code examples using boost to implement SPSC would be greatly appreciated.

like image 349
grouma Avatar asked Aug 04 '12 01:08

grouma


3 Answers

Below is the technique I used for my Cooperative Multi-tasking / Multi-threading library (MACE) http://bytemaster.github.com/mace/. It has the benefit of being lock-free except for when the queue is empty.

struct task {
   boost::function<void()> func;
   task* next;
};


boost::mutex                     task_ready_mutex;
boost::condition_variable        task_ready;
boost::atomic<task*>             task_in_queue;

// this can be called from any thread
void thread::post_task( task* t ) {
     // atomically post the task to the queue.
     task* stale_head = task_in_queue.load(boost::memory_order_relaxed);
     do { t->next = stale_head;
     } while( !task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );

   // Because only one thread can post the 'first task', only that thread will attempt
   // to aquire the lock and therefore there should be no contention on this lock except
   // when *this thread is about to block on a wait condition.  
    if( !stale_head ) { 
        boost::unique_lock<boost::mutex> lock(task_ready_mutex);
        task_ready.notify_one();
    }
}

// this is the consumer thread.
void process_tasks() {
  while( !done ) {
   // this will atomically pop everything that has been posted so far.
   pending = task_in_queue.exchange(0,boost::memory_order_consume);
   // pending is a linked list in 'reverse post order', so process them
   // from tail to head if you want to maintain order.

   if( !pending ) { // lock scope
      boost::unique_lock<boost::mutex> lock(task_ready_mutex);                
      // check one last time while holding the lock before blocking.
      if( !task_in_queue ) task_ready.wait( lock );
   }
 }
like image 107
bytemaster Avatar answered Nov 16 '22 16:11

bytemaster


There are many examples of producer-consumer queues on the net, safe for multiple producers/consumers. @bytemaster posted one that uses a link inside each message to eliminate storage in the queue class itself - that's a fine approach, I use it myself on embedded jobs.

Where the queue class must provide storage, I usually go with a 'pool queue' of size N, loaded up with N *message class instances at startup. Threads that need to communicate have to pop a *message from the pool, load it up and queue it on. When eventually 'used up' the *message gets pushed back onto the pool. This caps the number of messages and so all queues need only be of length N - no resizing, no new(), no delete(), easy leak-detection.

like image 30
Martin James Avatar answered Nov 16 '22 16:11

Martin James


If there is only a single consumer but multiple producers, then I would use an array or some array-like data-structure with O(1) access time where each array-slot represents a single-producer-consumer queue. The great advantage to a single-producer-consumer queue is the fact you can make it lock-free without any explicit synchronization mechanisms, thus making it a very fast data-structure in a multi-threaded environment. See my answer here for a bare-bones implementation of a single-producer-consumer queue.

like image 38
Jason Avatar answered Nov 16 '22 17:11

Jason