Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Mutex Safety with Interrupts (Embedded Firmware)

Tags:

c++

c

embedded

Edit @Mike pointed out that my try_lock function in the code below is unsafe and that accessor creation can produce a race condition as well. The suggestions (from everyone) have convinced me that I'm going down the wrong path.

Original Question

The requirements for locking on an embedded microcontroller are different enough from multithreading that I haven't been able to convert multithreading examples to my embedded applications. Typically I don't have an OS or threads of any kind, just main and whatever interrupt functions are called by the hardware periodically.

It's pretty common that I need to fill up a buffer from an interrupt, but process it in main. I've created the IrqMutex class below to try to safely implement this. Each person trying to access the buffer is assigned a unique id through IrqMutexAccessor, then they each can try_lock() and unlock(). The idea of a blocking lock() function doesn't work from interrupts because unless you allow the interrupt to complete, no other code can execute so the unlock() code never runs. I do however use a blocking lock from the main() code occasionally.

However, I know that the double-check lock doesn't work without C++11 memory barriers (which aren't available on many embedded platforms). Honestly despite reading quite a bit about it, I don't really understand how/why the memory access reordering can cause a problem. I think that the use of volatile sig_atomic_t (possibly combined with the use of unique IDs) makes this different from the double-check lock. But I'm hoping someone can: confirm that the following code is correct, explain why it isn't safe, or offer a better way to accomplish this.

class IrqMutex {
friend class IrqMutexAccessor;

private:
    std::sig_atomic_t accessorIdEnum;
    volatile std::sig_atomic_t owner;
protected:
    std::sig_atomic_t nextAccessor(void) { return ++accessorIdEnum; }
    bool have_lock(std::sig_atomic_t accessorId) {
        return (owner == accessorId);
    }
    bool try_lock(std::sig_atomic_t accessorId) {
        // Only try to get a lock, while it isn't already owned.
        while (owner == SIG_ATOMIC_MIN) {
            // <-- If an interrupt occurs here, both attempts can get a lock at the same time.

            // Try to take ownership of this Mutex.
            owner = accessorId; // SET

            // Double check that we are the owner.
            if (owner == accessorId) return true;

            // Someone else must have taken ownership between CHECK and SET.
            // If they released it after CHECK, we'll loop back and try again.
            // Otherwise someone else has a lock and we have failed.
        }        

        // This shouldn't happen unless they called try_lock on something they already owned.
        if (owner == accessorId) return true;

        // If someone else owns it, we failed.
        return false;
    }
    bool unlock(std::sig_atomic_t accessorId) {
        // Double check that the owner called this function (not strictly required)
        if (owner == accessorId) {
            owner = SIG_ATOMIC_MIN;
            return true;
        }
        
        // We still return true if the mutex was unlocked anyway.
        return (owner == SIG_ATOMIC_MIN);
    }
public:
    IrqMutex(void) : accessorIdEnum(SIG_ATOMIC_MIN), owner(SIG_ATOMIC_MIN) {}
};

// This class is used to manage our unique accessorId.
class IrqMutexAccessor {
friend class IrqMutex;
private:
    IrqMutex& mutex;
    const std::sig_atomic_t accessorId;
public:
    IrqMutexAccessor(IrqMutex& m) : mutex(m), accessorId(m.nextAccessor()) {}
    bool have_lock(void) { return mutex.have_lock(accessorId); }
    bool try_lock(void) { return mutex.try_lock(accessorId); }
    bool unlock(void) { return mutex.unlock(accessorId); }
};

Because there is one processor, and no threading the mutex serves what I think is a subtly different purpose than normal. There are two main use cases I run into repeatedly.

  1. The interrupt is a Producer and takes ownership of a free buffer and loads it with a packet of data. The interrupt/Producer may keep its ownership lock for a long time spanning multiple interrupt calls. The main function is the Consumer and takes ownership of a full buffer when it is ready to process it. The race condition rarely happens, but if the interrupt/Producer finishes with a packet and needs a new buffer, but they are all full it will try to take the oldest buffer (this is a dropped packet event). If the main/Consumer started to read and process that oldest buffer at exactly the same time they would trample all over each other.
  2. The interrupt is just a quick change or increment of something (like a counter). However, if we want to reset the counter or jump to some new value with a call from the main() code we don't want to try to write to the counter as it is changing. Here main actually does a blocking loop to obtain a lock, however I think its almost impossible to have to actually wait here for more than two attempts. Once it has a lock, any calls to the counter interrupt will be skipped, but that's generally not a big deal for something like a counter. Then I update the counter value and unlock it so it can start incrementing again.

I realize these two samples are dumbed down a bit, but some version of these patterns occur in many of the peripherals in every project I work on and I'd like once piece of reusable code that can safely handle this across various embedded platforms. I included the C tag, because all of this is directly convertible to C code, and on some embedded compilers that's all that is available. So I'm trying to find a general method that is guaranteed to work in both C and C++.

struct ExampleCounter {
    volatile long long int value;
    IrqMutex mutex;
} exampleCounter;

struct ExampleBuffer {
    volatile char data[256];
    volatile size_t index;
    IrqMutex mutex; // One mutex per buffer.
} exampleBuffers[2];

const volatile char * const REGISTER;

// This accessor shouldn't be created in an interrupt or a race condition can occur.
static IrqMutexAccessor myMutex(exampleCounter.mutex);
void __irqQuickFunction(void) {
    // Obtain a lock, add the data then unlock all within one function call.
    if (myMutex.try_lock()) {
        exampleCounter.value++;
        myMutex.unlock();
    } else {
        // If we failed to obtain a lock, we skipped this update this one time.
    }
}

// These accessors shouldn't be created in an interrupt or a race condition can occur.
static IrqMutexAccessor myMutexes[2] = {
    IrqMutexAccessor(exampleBuffers[0].mutex),
    IrqMutexAccessor(exampleBuffers[1].mutex)
};
void __irqLongFunction(void) {
    static size_t bufferIndex = 0;

    // Check if we have a lock.
    if (!myMutex[bufferIndex].have_lock() and !myMutex[bufferIndex].try_lock()) {
        // If we can't get a lock try the other buffer
        bufferIndex = (bufferIndex + 1) % 2;

        // One buffer should always be available so the next line should always be successful.
        if (!myMutex[bufferIndex].try_lock()) return;
    }
    
    // ... at this point we know we have a lock ...

    // Get data from the hardware and modify the buffer here.
    const char c = *REGISTER;
    exampleBuffers[bufferIndex].data[exampleBuffers[bufferIndex].index++] = c;

    // We may keep the lock for multiple function calls until the end of packet.
    static const char END_PACKET_SIGNAL = '\0';    
    if (c == END_PACKET_SIGNAL) {
        // Unlock this buffer so it can be read from main.
        myMutex[bufferIndex].unlock();

        // Switch to the other buffer for next time.
        bufferIndex = (bufferIndex + 1) % 2;
    }
}

int main(void) {
    while (true) {
        // Mutex for counter
        static IrqMutexAccessor myCounterMutex(exampleCounter.mutex);

        // Change counter value
        if (EVERY_ONCE_IN_A_WHILE) {
            // Skip any updates that occur while we are updating the counter.
            while(!myCounterMutex.try_lock()) {
                // Wait for the interrupt to release its lock.
            }

            // Set the counter to a new value.
            exampleCounter.value = 500;

            // Updates will start again as soon as we unlock it.
            myCounterMutex.unlock();
        }

        // Mutexes for __irqLongFunction.
        static IrqMutexAccessor myBufferMutexes[2] = {
            IrqMutexAccessor(exampleBuffers[0].mutex),
            IrqMutexAccessor(exampleBuffers[1].mutex)
        };

        // Process buffers from __irqLongFunction.
        for (size_t i = 0; i < 2; i++)  {
            // Obtain a lock so we can read the data.
            if (!myBufferMutexes[i].try_lock()) continue;
                // Check that the buffer isn't empty.
                if (exampleBuffers[i].index == 0) {
                    myBufferMutexes[i].unlock(); // Don't forget to unlock.
                    continue;
                }

                // ... read and do something with the data here ...
                exampleBuffer.index = 0;

                myBufferMutexes[i].unlock();
            }
        }
    }
}

Also note that I used volatile on any variable that is read-by or written-by the interrupt routine (unless the variable was only accessed from the interrupt like the static bufferIndex value in __irqLongFunction). I've read that mutexes remove some of need for volatile in multithreaded code, but I don't think that applies here. Did I use the right amount of volatile? I used it on: ExampleBuffer[].data[256], ExampleBuffer[].index, and ExampleCounter.value.

like image 823
Andy Stangeland Avatar asked Dec 10 '14 19:12

Andy Stangeland


1 Answers

I apologize for the long answer, but perhaps it is fitting for a long question.

To answer your first question, I would say that your implementation of IrqMutex is not safe. Let me try to explain where I see problems.

Function nextAccessor

std::sig_atomic_t nextAccessor(void) { return ++accessorIdEnum; }

This function has a race condition, because the increment operator is not atomic, despite it being on an atomic value marked volatile. It involves 3 operations: reading the current value of accessorIdEnum, incrementing it, and writing the result back. If two IrqMutexAccessors are created at the same time, it's possible that they both get the same ID.

Function try_lock

The try_lock function also has a race condition. One thread (eg main), could go into the while loop, and then before taking ownership, another thread (eg an interrupt) can also go into the while loop and take ownership of the lock (returning true). Then the first thread can continue, moving onto owner = accessorId, and thus "also" take the lock. So two threads (or your main thread and an interrupt) can try_lock on an unowned mutex at the same time and both return true.

Disabling interrupts by RAII

We can achieve some level of simplicity and encapsulation by using RAII for interrupt disabling, for example the following class:

class InterruptLock {
public:
    InterruptLock() { 
        prevInterruptState = currentInterruptState();
        disableInterrupts();
    }

    ~InterruptLock() { 
        restoreInterrupts(prevInterruptState);
    }
private:
    int prevInterruptState; // Whatever type this should be for the platform
    InterruptLock(const InterruptLock&); // Not copy-constructable
};

And I would recommend disabling interrupts to get the atomicity you need within the mutex implementation itself. For example something like:

bool try_lock(std::sig_atomic_t accessorId) {
    InterruptLock lock;
    if (owner == SIG_ATOMIC_MIN) {
        owner = accessorId;
        return true;
    }
    return false;
}
bool unlock(std::sig_atomic_t accessorId) {
    InterruptLock lock;
    if (owner == accessorId) {
        owner = SIG_ATOMIC_MIN;
        return true;
    }
    return false;
}

Depending on your platform, this might look different, but you get the idea.

As you said, this provides a platform to abstract away from the disabling and enabling interrupts in general code, and encapsulates it to this one class.

Mutexes and Interrupts

Having said how I would consider implementing the mutex class, I would not actually use a mutex class for your use-cases. As you pointed out, mutexes don't really play well with interrupts, because an interrupt can't "block" on trying to acquire a mutex. For this reason, for code that directly exchanges data with an interrupt, I would instead strongly consider just directly disabling interrupts (for a very short time while the main "thread" touches the data).

So your counter might simply look like this:

volatile long long int exampleCounter;

void __irqQuickFunction(void) {
    exampleCounter++;
}
...
// Change counter value
if (EVERY_ONCE_IN_A_WHILE) {
    InterruptLock lock;
    exampleCounter = 500;
}

In my mind, this is easier to read, easier to reason about, and won't "slip" when there's contention (ie miss timer beats).

Regarding the buffer use-case, I would strongly recommend against holding a lock for multiple interrupt cycles. A lock/mutex should be held for just the slightest moment required to "touch" a piece of memory - just long enough to read or write it. Get in, get out.

So this is how the buffering example might look:

struct ExampleBuffer {
    char data[256];
} exampleBuffers[2];

ExampleBuffer* volatile bufferAwaitingConsumption = nullptr;
ExampleBuffer* volatile freeBuffer = &exampleBuffers[1];

const volatile char * const REGISTER;

void __irqLongFunction(void) {

    static const char END_PACKET_SIGNAL = '\0';    
    static size_t index = 0;
    static ExampleBuffer* receiveBuffer = &exampleBuffers[0];

    // Get data from the hardware and modify the buffer here.
    const char c = *REGISTER;
    receiveBuffer->data[index++] = c;

    // End of packet?
    if (c == END_PACKET_SIGNAL) {
        // Make the packet available to the consumer
        bufferAwaitingConsumption = receiveBuffer;
        // Move on to the next buffer
        receiveBuffer = freeBuffer;
        freeBuffer = nullptr;
        index = 0;
    }
}


int main(void) {

    while (true) {

        // Fetch packet from shared variable
        ExampleBuffer* packet;
        {
            InterruptLock lock;
            packet = bufferAwaitingConsumption;
            bufferAwaitingConsumption = nullptr;
        }

        if (packet) {
            // ... read and do something with the data here ...

            // Once we're done with the buffer, we need to release it back to the producer
            {
                InterruptLock lock;
                freeBuffer = packet;
            }
        }
    }
}

This code is arguably easier to reason about, since there are only two memory locations shared between the interrupt and the main loop: one to pass packets from the interrupt to the main loop, and one to pass empty buffers back to the interrupt. We also only touch those variables under "lock", and only for the minimum time needed to "move" the value. (for simplicity I've skipped over the buffer overflow logic when the main loop takes too long to free the buffer).

It's true that in this case one may not even need the locks, since we're just reading and writing simple value, but the cost of disabling the interrupts is not much, and the risk of making mistakes otherwise, is not worth it in my opinion.

Edit

As pointed out in the comments, the above solution was meant to only tackle the multithreading problem, and omitted overflow checking. Here is more complete solution which should be robust under overflow conditions:

const size_t BUFFER_COUNT = 2; 

struct ExampleBuffer {
    char data[256];
    ExampleBuffer* next;
} exampleBuffers[BUFFER_COUNT];

volatile size_t overflowCount = 0;

class BufferList {
public:
    BufferList() : first(nullptr), last(nullptr) { }

    // Atomic enqueue
    void enqueue(ExampleBuffer* buffer) {
        InterruptLock lock;
        if (last)
            last->next = buffer;
        else {
            first = buffer;
            last = buffer;
        }
    }

    // Atomic dequeue (or returns null)
    ExampleBuffer* dequeueOrNull() {
        InterruptLock lock;
        ExampleBuffer* result = first;
        if (first) {
            first = first->next;
            if (!first)
                last = nullptr;
        }
        return result;
    }
private:
    ExampleBuffer* first;
    ExampleBuffer* last;
} freeBuffers, buffersAwaitingConsumption;

const volatile char * const REGISTER;

void __irqLongFunction(void) {

    static const char END_PACKET_SIGNAL = '\0';    
    static size_t index = 0;
    static ExampleBuffer* receiveBuffer = &exampleBuffers[0];

    // Recovery from overflow?
    if (!receiveBuffer) {
        // Try get another free buffer
        receiveBuffer = freeBuffers.dequeueOrNull();
        // Still no buffer?
        if (!receiveBuffer) {
            overflowCount++;
            return; 
        }
    }

    // Get data from the hardware and modify the buffer here.
    const char c = *REGISTER;

    if (index < sizeof(receiveBuffer->data))
        receiveBuffer->data[index++] = c;

    // End of packet, or out of space?
    if (c == END_PACKET_SIGNAL) {
        // Make the packet available to the consumer
        buffersAwaitingConsumption.enqueue(receiveBuffer);
        // Move on to the next free buffer
        receiveBuffer = freeBuffers.dequeueOrNull();
        index = 0;
    }
}

size_t getAndResetOverflowCount() {
    InterruptLock lock;
    size_t result = overflowCount;
    overflowCount = 0;
    return result;
}


int main(void) {

    // All buffers are free at the start
    for (int i = 0; i < BUFFER_COUNT; i++)
        freeBuffers.enqueue(&exampleBuffers[i]);

    while (true) {

        // Fetch packet from shared variable
        ExampleBuffer* packet = dequeueOrNull();

        if (packet) {
            // ... read and do something with the data here ...

            // Once we're done with the buffer, we need to release it back to the producer
            freeBuffers.enqueue(packet);
        }

        size_t overflowBytes = getAndResetOverflowCount();
        if (overflowBytes) {
            // ...
        }
    }
}

The key changes:

  • If the interrupt runs out of free buffers, it will recover
  • If the interrupt receives data while it doesn't have a receive buffer, it will communicate that to the main thread via getAndResetOverflowCount
  • If you keep getting buffer overflows, you can simply increase the buffer count
  • I've encapsulated the multithreaded access into a queue class implemented as a linked list (BufferList), which supports atomic dequeue and enqueue. The previous example also used queues, but of length 0-1 (either an item is enqueued or it isn't), and so the implementation of the queue was just a single variable. In the case of running out of free buffers, the receive queue could have 2 items, so I upgraded it to a proper queue rather than adding more shared variables.
like image 189
Mike Avatar answered Sep 28 '22 04:09

Mike