I need a queue for passing messages from one thread (A) to another (B), however ive not been able to find one that really does what I want, since they generally allow adding an item to fail, a case which in my situation is pretty much fatal since the message needs to be processed, and the thread really cant stop and wait for spare room.
Here's how to write a lock-free queue in C++:
http://www.ddj.com/hpc-high-performance-computing/210604448
But when you say "thread A must not block", are you sure that's the requirement? Windows is not a real-time operating system (and neither is linux, in normal use). If you want Thread A to be able to use all available system memory, then it needs to allocate memory (or wait while someone else does). The OS itself cannot provide timing guarantees any better than those you'd have if both reader and writer took an in-process lock (i.e. a non-shared mutex) in order to manipulate the list. And the worst-case of adding a message is going to have to go to the OS to get memory.
In short, there's a reason those queues you don't like have a fixed capacity - it's so that they don't have to allocate memory in the supposedly low-latency thread.
So the lock-free code will generally be less block-y, but due to the memory allocation it isn't guaranteed to be, and performance with a mutex shouldn't be all that shabby unless you have a truly huge stream of events to process (like, you're writing a network driver and the messages are incoming ethernet packets).
So, in pseudo-code, the first thing I'd try would be:
Writer:
allocate message and fill it in
acquire lock
append node to intrusive list
signal condition variable
release lock
Reader:
for(;;)
acquire lock
for(;;)
if there's a node
remove it
break
else
wait on condition variable
endif
endfor
release lock
process message
free message
endfor
Only if this proves to introduce unacceptable delays in the writer thread would I go to lock-free code, (unless I happened to have a suitable queue already lying around).
Visual Studio 2010 is adding 2 new libraries which support this scenario very well, the Asynchronous Agents Library and Parallel Pattern Library.
The agents library has support or asynchronous message passing and contains message blocks for sending messages to 'targets' and for receiving messages from 'sources'
An unbounded_buffer is a template class which offers what I believe you are looking for:
#include <agents.h>
#include <ppl.h>
#include <iostream>
using namespace ::Concurrency;
using namespace ::std;
int main()
{
//to hold our messages, the buffer is unbounded...
unbounded_buffer<int> buf1;
task_group tasks;
//thread 1 sends messages to the unbounded_buffer
//without blocking
tasks.run([&buf1](){
for(int i = 0 ; i < 10000; ++i)
send(&buf1,i)
//signal exit
send(&buf1,-1);
});
//thread 2 receives messages and blocks if there are none
tasks.run([&buf1](){
int result;
while(result = receive(&buf1)!=-1)
{
cout << "I got a " << result << endl;
}
});
//wait for the threads to end
tasks.wait();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With