Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Assigning a new task to a thread after the thread completes in C++

I have the following code in C++. The code is from C++ Concurrency In Action: Practical Multithreading

void do_work(unsigned id);

void f() {
    std::vector<std::thread> threads;
    for(unsigned i = 0; i < 20; ++i) {
        threads.push_back(std::thread(do_work, i));
    }
    std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
}

Suppose that threads[0] has completed processing and returns a value. I still have more files to process and would now like to assign this new file to a thread that is complete. How can I achieve this behavior in C++ ? Or must I destroy the thread and now create a new one upon thread completion ? But then how do I check if any of these threads have completed ?

like image 878
Mutating Algorithm Avatar asked Feb 07 '23 13:02

Mutating Algorithm


1 Answers

Here's a basic implementation of what Sam Varshavchik has explained.

Live demo

The reason why I've added a local_queue is to make sure our m_Mutex is unlocked right away. If you remove it the thread that calls push_task can potentially block.

The destructor calls stop() which sets m_Running to false, notifies the thread about it, and waits for it to finish processing all remaining tasks.

If the worker class dies, the thread dies too, which is good.

My example only creates 3 threads and 5 tasks per thread for (int i = 0; i < 5; i++), mainly to make sure all the output is shown in ideone, but I've tested it with 10 threads and 5000 tasks per thread and it ran fine.

The do_work function has two lines that you can uncomment if you want the output stream to be properly synchronized. This class has multi-thread support.

You can stop() and restart() the thread as many times as you like

class Worker
{
public:
    Worker(bool start) : m_Running(start) { if (start) private_start(); }
    Worker() : m_Running(false) { }
    ~Worker() { stop(); }

    template<typename... Args>
    void push_task(Args&&... args)
    {
        {
            std::lock_guard<std::mutex> lk(m_Mutex);
            m_Queue.push_back(std::bind(std::forward<Args>(args)...));
        }

        m_Condition.notify_all();
    }

    void start()
    {
        {
            std::lock_guard<std::mutex> lk(m_Mutex);
            if (m_Running == true) return;
            m_Running = true;
        }

        private_start();
    }

    void stop()
    {
        {
            std::lock_guard<std::mutex> lk(m_Mutex);
            if (m_Running == false) return;
            m_Running = false;
        }

        m_Condition.notify_all();
        m_Thread.join();
    }

private:
    void private_start()
    {
        m_Thread = std::thread([this]
        {
            for (;;)
            {
                decltype(m_Queue) local_queue;
                {
                    std::unique_lock<std::mutex> lk(m_Mutex);
                    m_Condition.wait(lk, [&] { return !m_Queue.empty() + !m_Running; });

                    if (!m_Running)
                    {
                        for (auto& func : m_Queue)
                            func();

                        m_Queue.clear();
                        return;
                    }

                    std::swap(m_Queue, local_queue);
                }

                for (auto& func : local_queue)
                    func();
            }
        });
    }

private:
    std::condition_variable m_Condition;
    std::list<std::function<void()>> m_Queue;
    std::mutex m_Mutex;
    std::thread m_Thread;
    bool m_Running = false;
};

void do_work(unsigned id)
{
    //static std::mutex cout_mutex;
    //std::lock_guard<std::mutex> lk(cout_mutex);
    std::cout << id << std::endl;
}

int main()
{
    {
        Worker workers[3];
        int counter = 0;

        for (auto& worker : workers)
            worker.start();

        for (auto& worker : workers)
        {
            for (int i = 0; i < 5; i++)
                worker.push_task(do_work, ++counter + i);
        }
    }

    std::cout << "finish" << std::endl;
    getchar();

    return 0;
}
like image 159
Jts Avatar answered Feb 10 '23 15:02

Jts