Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to avoid race conditions in a condition variable in VxWorks

We're programming on a proprietary embedded platform sitting atop of VxWorks 5.5. In our toolbox, we have a condition variable, that is implemented using a VxWorks binary semaphore.

Now, POSIX provides a wait function that also takes a mutex. This will unlock the mutex (so that some other task might write to the data) and waits for the other task to signal (it is done writing the data). I believe this implements what's called a Monitor, ICBWT.

We need such a wait function, but implementing it is tricky. A simple approach would do this:

bool condition::wait_for(mutex& mutex) const {
    unlocker ul(mutex);    // relinquish mutex
    return wait(event);
}                          // ul's dtor grabs mutex again

However, this sports a race condition because it allows another task to preempt this one after the unlocking and before the waiting. The other task can write to the date after it was unlocked and signal the condition before this task starts to wait for the semaphore. (We have tested this and this indeed happens and blocks the waiting task forever.)

Given that VxWorks 5.5 doesn't seem to provide an API to temporarily relinquish a semaphore while waiting for a signal, is there a way to implement this on top of the provided synchronization routines?

Note: This is a very old VxWorks version that has been compiled without POSIX support (by the vendor of the proprietary hardware, from what I understood).

like image 914
sbi Avatar asked Sep 30 '13 13:09

sbi


2 Answers

This should be quite easy with native vxworks, a message queue is what is required here. Your wait_for method can be used as is.

bool condition::wait_for(mutex& mutex) const 
{
    unlocker ul(mutex);    // relinquish mutex
    return wait(event);
}                          // ul's dtor grabs mutex again

but the wait(event) code would look like this:

wait(event)
{
    if (msgQRecv(event->q, sigMsgBuf, sigMsgSize, timeoutTime) == OK)
    {
        // got it...
    }
    else
    {
        // timeout, report error or something like that....
    }
}

and your signal code would like something like this:

signal(event)
{
    msgQSend(event->q, sigMsg, sigMsgSize, NO_WAIT, MSG_PRI_NORMAL);
}

So if the signal gets triggered before you start waiting, then msgQRecv will return immediately with the signal when it eventually gets invoked and you can then take the mutex again in the ul dtor as stated above.

The event->q is a MSG_Q_ID that is created at event creation time with a call to msgQCreate, and the data in sigMsg is defined by you... but can be just a random byte of data, or you can come up with a more intelligent structure with information regarding who signaled or something else that may be nice to know.

Update for multiple waiters, this is a little tricky: So there are a couple of assumptions I will make to simplify things

  1. The number of tasks that will be pending is known at event creation time and is constant.
  2. There will be one task that is always responsible for indicating when it is ok to unlock the mutex, all other tasks just want notification when the event is signaled/complete.

This approach uses a counting semaphore, similar to the above with just a little extra logic:

wait(event)
{
    if (semTake(event->csm, timeoutTime) == OK)
    {
        // got it...
    }
    else
    {
        // timeout, report error or something like that....
    }
}

and your signal code would like something like this:

signal(event)
{
    for (int x = 0; x < event->numberOfWaiters; x++)
    {
        semGive(event->csm);
    }
}

The creation of the event is something like this, remember in this example the number of waiters is constant and known at event creation time. You could make it dynamic, but the key is that every time the event is going to happen the numberOfWaiters must be correct before the unlocker unlocks the mutex.

createEvent(numberOfWaiters)
{
    event->numberOfWaiters = numberOfWaiters;
    event->csv = semCCreate(SEM_Q_FIFO, 0);
    return event;
}

You cannot be wishy-washy about the numberOfWaiters :D I will say it again: The numberOfWaiters must be correct before the unlocker unlocks the mutex. To make it dynamic (if that is a requirement) you could add a setNumWaiters(numOfWaiters) function, and call that in the wait_for function before the unlocker unlocks the mutex, so long as it always sets the number correctly.

Now for the last trick, as stated above the assumption is that one task is responsible for unlocking the mutex, the rest just wait for the signal, which means that one and only one task will call the wait_for() function above, and the rest of the tasks just call the wait(event) function.

With this in mind the numberOfWaiters is computed as follows:

  • The number of tasks who will call wait()
  • plus 1 for the task that calls wait_for()

Of course you can also make this more complex if you really need to, but chances are this will work because normally 1 task triggers an event, but many tasks want to know it is complete, and that is what this provides.

But your basic flow is as follows:

init()
{
    event->createEvent(3);
}

eventHandler()
{
    locker l(mutex);
    doEventProcessing();
    signal(event);
}

taskA()
{
    doOperationThatTriggersAnEvent();
    wait_for(mutex);
    eventComplete();
}

taskB()
{
    doWhateverIWant();
    // now I need to know if the event has occurred...
    wait(event);
    coolNowIKnowThatIsDone();
}

taskC()
{
    taskCIsFun();
    wait(event);
    printf("event done!\n");
}

When I write the above I feel like all OO concepts are dead, but hopefully you get the idea, in reality wait and wait_for should take the same parameter, or no parameter but rather be members of the same class that also has all the data they need to know... but none the less that is the overview of how it works.

like image 89
Chris Desjardins Avatar answered Oct 17 '22 16:10

Chris Desjardins


Race conditions can be avoided if each waiting task waits on a separate binary semaphore. These semaphores must be registered in a container which the signaling task uses to unblock all waiting tasks. The container must be protected by a mutex.

The wait_for() method obtains a binary semaphore, waits on it and finally deletes it.

void condition::wait_for(mutex& mutex) {
    SEM_ID sem = semBCreate(SEM_Q_PRIORITY, SEM_EMPTY);
    {
        lock l(listeners_mutex);    // assure exclusive access to listeners container
        listeners.push_back(sem);       
    }                               // l's dtor unlocks listeners_mutex again

    unlocker ul(mutex);             // relinquish mutex
    semTake(sem, WAIT_FOREVER);

    {
        lock l(listeners_mutex);
        // remove sem from listeners
        // ...
        semDelete(sem);
    }
}                                   // ul's dtor grabs mutex again

The signal() method iterates over all registered semaphores and unlocks them.

void condition::signal() {
    lock l(listeners_mutex);
    for_each (listeners.begin(), listeners.end(), /* call semGive()... */ )
}

This approach assures that wait_for() will never miss a signal. A disadvantage is the need of additional system resources. To avoid creating and destroying semaphores for every wait_for() call, a pool could be used.

like image 3
didymos_c Avatar answered Oct 17 '22 17:10

didymos_c