Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read std::queue shared with another thread?

My code acquires images and processes them. Performance is critical for my code, so I've tried my hand at multi-threading. Currently, I've only made the acquiring part a separate thread. I'm implementing a simple FIFO buffer using std::queue that stores the acquired images. The acquisition function AcquireImages writes raw image data to this buffer indefinitely until user interruption. Processing function, ProcessImages reads the buffer and processes the image data (currently in the main thread but I'm planning to make this a separate thread as well once I've ironed out issues). Here's my code (modified to form an MCV example):

#include <iostream>
#include <vector>
#include <queue>
#include <atomic>
#include <thread>

#define NUM_CAMERAS 2

void AcquireImages(std::queue<unsigned char*> &rawImageQueue, std::atomic<bool> &quit)
{
    unsigned char* rawImage{};

    while (!quit)
    {
        for (int camera = 0; camera < NUM_CAMERAS; camera++)
        {
            switch (camera)
            {
            case 0:
                rawImage = (unsigned char*)"Cam0Image";
                break;
            case 1:
                rawImage = (unsigned char*)"Cam1Image";
                break;
            default:
                break;
            }
            
            rawImageQueue.push(std::move(rawImage));
        }
    }
}

int ProcessImages(const std::vector<unsigned char*> &rawImageVec, const int count)
{
    // Do something to the raw image vector

    if (count > 10)
    {
        return 1;
    }
    else
    {
        return 0;
    } // In my application, this function only returns non-zero upon user interception.
}


int main()
{
    // Preparation
    std::vector<unsigned char*> rawImageVec;
    rawImageVec.reserve(NUM_CAMERAS);
    std::queue<unsigned char*> rawImageQueue;
    int count{};

    const unsigned int nThreads = 1;    // this might grow later

    std::atomic<bool> loopFlags[nThreads];
    std::thread       threads[nThreads];

    // Start threads
    for (int i = 0; i < nThreads; i++) {
        loopFlags[i] = false;
        threads[i] = std::thread(AcquireImages, rawImageQueue, ref(loopFlags[i]));
    }

    // Process images
    while (true)
    {
    
        // Process the images
        for (int cam{}; cam < NUM_CAMERAS; ++cam)
        {
            rawImageVec.push_back(rawImageQueue.front());
            rawImageQueue.pop();
        }

        int processResult = ProcessImages(move(rawImageVec), count);
        if (processResult)
        {
            std::cout << "Leaving while loop.\n"; // In my application this is triggered by the user
            break;
        }

        rawImageVec.clear();
        ++count;
    }

    // Shutdown other threads
    for (auto & flag : loopFlags) {
        flag = true;
    }

    // Wait for threads to actually finish.
    for (auto& thread : threads) {
        thread.join();
    }

    return 0;
}

Some of you may have already noticed my blunder. What I know is that this program throws an exception atrawImageVec.push_back(rawImageQueue.front());.

The output after throwing the exception reads as follows:

Debug Assertion Failed!

Program: C:\WINDOWS\SYSTEM32\MSVCP140D.dll
File: c:\program files (x86)\microsoft visual studio 14.0\vc\include\deque
Line: 329

Expression: deque iterator not dereferencable

I understand the cause of the issue is probably that I'm reading something that is shared with another thread (Am I correct?). How do I resolve this?

I followed Praetorian's advice in the comments, after checking to see if rawImageQueue is empty, I see that it's always empty. I'm not sure what's causing this.

like image 438
db7638 Avatar asked Sep 07 '17 22:09

db7638


2 Answers

Here is a generalized example of producer/consumer on a shared queue. The idea is that if you're writing and reading from a data structure, you need some kind of protection around accesses.

For this, the below example uses condition variables and a mutex.

#include <thread>
#include <iostream>
#include <chrono>
#include <queue>
#include <mutex>
#include <vector>
#include <condition_variable>

using namespace std::chrono_literals;
using std::vector;
using std::thread;
using std::unique_lock;
using std::mutex;
using std::condition_variable;
using std::queue;

class WorkQueue
{
  condition_variable work_available;
  mutex work_mutex;
  queue<int> work;

public:
  void push_work(int item)
  {
    unique_lock<mutex> lock(work_mutex);

    bool was_empty = work.empty();
    work.push(item);

    lock.unlock();

    if (was_empty)
    {
      work_available.notify_one();
    }    
  }

  int wait_and_pop()
  {
    unique_lock<mutex> lock(work_mutex);
    while (work.empty())
    {
      work_available.wait(lock);
    }

    int tmp = work.front();
    work.pop();
    return tmp;
  }
};

int main() {
  WorkQueue work_queue;

  auto producer = [&]() {
    while (true) {
      work_queue.push_work(10);
      std::this_thread::sleep_for(2ms);
    }
  };

  vector<thread> producers;
  producers.push_back(std::thread(producer));
  producers.push_back(std::thread(producer));
  producers.push_back(std::thread(producer));
  producers.push_back(std::thread(producer));

  std::thread consumer([&]() {        
    while (true)
    {
      int work_to_do = work_queue.wait_and_pop();
      std::cout << "Got some work: " << work_to_do << std::endl;
    }
  });

  std::for_each(producers.begin(), producers.end(), [](thread &p) {
    p.join();
  });    

  consumer.join();  
}
like image 143
Josh Avatar answered Oct 06 '22 11:10

Josh


Your case is relatively simple as seems you have just one producer and one consumer. Also image processing sounds quite slow (slow enough to not worry about threads contention) and you're switching from single-threaded version so probably no need to bother with highly efficient lock-free implementations.

I'd recommend to study this pseudo code: https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem#Using_monitors, then to learn about condition variables if you need: http://en.cppreference.com/w/cpp/thread/condition_variable.

like image 43
Andriy Tylychko Avatar answered Oct 06 '22 11:10

Andriy Tylychko