I am trying to write some multithreaded code to read from a DAQ device and render the captured signal at the same time:
std::atomic <bool> rendering (false);
auto render = [&rendering, &display, &signal] (void)
{
while (not rendering)
{std::this_thread::yield ();};
do {display.draw (signal);}
while (display.rendering ()); // returns false when user quits
rendering = false;
};
auto capture = [&rendering, &daq] (void)
{
for (int i = daq.read_frequency (); i --> 0;)
daq.record (); // fill the buffer before displaying the signal
rendering = true;
do {daq.record ();}
while (rendering);
daq.stop ();
};
std::thread rendering_thread (render);
std::thread capturing_thread (capture);
rendering_thread.join ();
capturing_thread.join ();
Sometimes this will work fine, but usually I get really bad stuttering. I had render ()
and capture ()
print a line upon each loop iteration, and then colored the lines such that red is from render ()
and blue is from capture ()
:
The left plot is from a smooth run, the right plot is from a run with stuttering.
I had roughly the equivalent program in C using openMP and the performance was always smooth:
int status = 0;
#pragma omp parallel num_threads(2) private(tid) shared(status)
/* READ AND DRAW */ {
tid = omp_get_thread_num ();
/* DRAW */ if (tid is 0) {
int finished = 0;
while (not finished) {
#pragma omp critical
/* GET JOB STATUS */ {
finished = status;
}
finished = renderDisplay ();
}
#pragma omp critical
/* TERMINATE DISPLAY */ {
cvDestroyAllWindows();
}
#pragma omp atomic
status ++;
#pragma omp flush(status)
}
/* READ */ if (tid is 1) {
int finished = 0;
while (not finished) {
#pragma omp critical
/* GET JOB STATUS */ {
finished = status;
}
captureSignal ();
}
}
#pragma omp barrier
}
At least, both the C and C++11 versions look equivalent to me, but I can't figure out why the stuttering is happening in the C++11 version.
I can't post a SSCCE because the daq.*
routines all depend on the NI DAQ library, but it may be worth noting that daq.record ()
blocks until the physical device is finished reading, and the NI DAQ lib itself spawns several threads when it starts.
I've tried implementing atomic flags in various configurations and changing function call orders and nothing seems to have an effect.
What is going on here, and how can I control it?
update: increasing the sampling rate of the DAQ alleviates the problem, which leads me to strongly suspect that this does have something to do with the fact that daq.record ()
is a blocking call.
As people in the comments have mentioned you don't have much control of the scheduling. What probably can help you even more is to turn away from spin locks and use conditions. This will force put the render thread into sleep if it went too fast and processed all the data the capture thread has produced. You can look at this example for 1 iteration. In your case every time more data becomes available from the capture thread, you need to call notify_one(). You can use the version of wait that takes only 1 parameter for your case.
So your code will become something like this
std::mutex mutex;
std::condition_variable condition;
std::atomic <bool> rendering (false);
auto render = [&rendering, &display, &signal] (void)
{
// this while loop is not needed anymore because
// we will wait for a signal before doing any drawing
while (not rendering)
{std::this_thread::yield ();};
// first we lock. destructor will unlock for us
std::unique_lock<std::mutex> lock(mutex);
do {
// this will wait until we have been signaled
condition.wait(lock);
// maybe check display.rendering() and exit (depending on your req.)
// process all data available
display.draw (signal);
} while (display.rendering ()); // returns false when user quits
rendering = false;
};
auto capture = [&rendering, &daq] (void)
{
for (int i = daq.read_frequency (); i --> 0;)
daq.record (); // fill the buffer before displaying the signal
rendering = true;
condition.notify_one();
// special note; you can call notify_one() here with
// the mutex lock not acquired.
do {daq.record (); condition.notify_one();}
while (rendering);
daq.stop ();
// signal one more time as the render thread could have
// been in "wait()" call
condition.notify_one();
};
std::thread rendering_thread (render);
std::thread capturing_thread (capture);
rendering_thread.join ();
capturing_thread.join ();
Doing it this way will also consume less CPU resources, as the render thread will go to sleep when there is no data to process.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With