I have the following code in C++. The code is from C++ Concurrency In Action: Practical Multithreading
void do_work(unsigned id);
void f() {
std::vector<std::thread> threads;
for(unsigned i = 0; i < 20; ++i) {
threads.push_back(std::thread(do_work, i));
}
std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
}
Suppose that threads[0] has completed processing and returns a value. I still have more files to process and would now like to assign this new file to a thread that is complete. How can I achieve this behavior in C++ ? Or must I destroy the thread and now create a new one upon thread completion ? But then how do I check if any of these threads have completed ?
Here's a basic implementation of what Sam Varshavchik has explained.
Live demo
The reason why I've added a local_queue
is to make sure our m_Mutex
is unlocked right away. If you remove it the thread that calls push_task
can potentially block.
The destructor calls stop()
which sets m_Running
to false
, notifies the thread about it, and waits for it to finish processing all remaining tasks.
If the worker class dies, the thread dies too, which is good.
My example only creates 3 threads and 5 tasks per thread for (int i = 0; i < 5; i++)
, mainly to make sure all the output is shown in ideone, but I've tested it with 10 threads and 5000 tasks per thread and it ran fine.
The do_work
function has two lines that you can uncomment if you want the output stream to be properly synchronized. This class has multi-thread support.
You can stop()
and restart()
the thread as many times as you like
class Worker
{
public:
Worker(bool start) : m_Running(start) { if (start) private_start(); }
Worker() : m_Running(false) { }
~Worker() { stop(); }
template<typename... Args>
void push_task(Args&&... args)
{
{
std::lock_guard<std::mutex> lk(m_Mutex);
m_Queue.push_back(std::bind(std::forward<Args>(args)...));
}
m_Condition.notify_all();
}
void start()
{
{
std::lock_guard<std::mutex> lk(m_Mutex);
if (m_Running == true) return;
m_Running = true;
}
private_start();
}
void stop()
{
{
std::lock_guard<std::mutex> lk(m_Mutex);
if (m_Running == false) return;
m_Running = false;
}
m_Condition.notify_all();
m_Thread.join();
}
private:
void private_start()
{
m_Thread = std::thread([this]
{
for (;;)
{
decltype(m_Queue) local_queue;
{
std::unique_lock<std::mutex> lk(m_Mutex);
m_Condition.wait(lk, [&] { return !m_Queue.empty() + !m_Running; });
if (!m_Running)
{
for (auto& func : m_Queue)
func();
m_Queue.clear();
return;
}
std::swap(m_Queue, local_queue);
}
for (auto& func : local_queue)
func();
}
});
}
private:
std::condition_variable m_Condition;
std::list<std::function<void()>> m_Queue;
std::mutex m_Mutex;
std::thread m_Thread;
bool m_Running = false;
};
void do_work(unsigned id)
{
//static std::mutex cout_mutex;
//std::lock_guard<std::mutex> lk(cout_mutex);
std::cout << id << std::endl;
}
int main()
{
{
Worker workers[3];
int counter = 0;
for (auto& worker : workers)
worker.start();
for (auto& worker : workers)
{
for (int i = 0; i < 5; i++)
worker.push_task(do_work, ++counter + i);
}
}
std::cout << "finish" << std::endl;
getchar();
return 0;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With