Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

When one worker thread fails, how to abort remaining workers?

I have a program which spawns multiple threads, each of which executes a long-running task. The main thread then waits for all worker threads to join, collects results, and exits.

If an error occurs in one of the workers, I want the remaining workers to stop gracefully, so that the main thread can exit shortly afterwards.

My question is how best to do this, when the implementation of the long-running task is provided by a library whose code I cannot modify.

Here is a simple sketch of the system, with no error handling:

void threadFunc()
{
    // Do long-running stuff
}

void mainFunc()
{
    std::vector<std::thread> threads;

    for (int i = 0; i < 3; ++i) {
        threads.push_back(std::thread(&threadFunc));
    }

    for (auto &t : threads) {
        t.join();
    }
}

If the long-running function executes a loop and I have access to the code, then execution can be aborted simply by checking a shared "keep on running" flag at the top of each iteration.

std::mutex mutex;
bool error;

void threadFunc()
{
    try {
        for (...) {
            {
                std::unique_lock<std::mutex> lock(mutex);
                if (error) {
                    break;
                }
            }
        }
    } catch (std::exception &) {
        std::unique_lock<std::mutex> lock(mutex);
        error = true;
    }
}

Now consider the case when the long-running operation is provided by a library:

std::mutex mutex;
bool error;

class Task
{
public:
    // Blocks until completion, error, or stop() is called
    void run();

    void stop();
};

void threadFunc(Task &task)
{
    try {
        task.run();
    } catch (std::exception &) {
        std::unique_lock<std::mutex> lock(mutex);
        error = true;
    }
}

In this case, the main thread has to handle the error, and call stop() on the still-running tasks. As such, it cannot simply wait for each worker to join() as in the original implementation.

The approach I have used so far is to share the following structure between the main thread and each worker:

struct SharedData
{
    std::mutex mutex;
    std::condition_variable condVar;
    bool error;
    int running;
}

When a worker completes successfully, it decrements the running count. If an exception is caught, the worker sets the error flag. In both cases, it then calls condVar.notify_one().

The main thread then waits on the condition variable, waking up if either error is set or running reaches zero. On waking up, the main thread calls stop() on all tasks if error has been set.

This approach works, but I feel there should be a cleaner solution using some of the higher-level primitives in the standard concurrency library. Can anyone suggest an improved implementation?

Here is the complete code for my current solution:

// main.cpp

#include <chrono>
#include <mutex>
#include <thread>
#include <vector>

#include "utils.h"

// Class which encapsulates long-running task, and provides a mechanism for aborting it
class Task
{
public:
    Task(int tidx, bool fail)
    :   tidx(tidx)
    ,   fail(fail)
    ,   m_run(true)
    {

    }

    void run()
    {
        static const int NUM_ITERATIONS = 10;

        for (int iter = 0; iter < NUM_ITERATIONS; ++iter) {
            {
                std::unique_lock<std::mutex> lock(m_mutex);
                if (!m_run) {
                    out() << "thread " << tidx << " aborting";
                    break;
                }
            }

            out() << "thread " << tidx << " iter " << iter;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));

            if (fail) {
                throw std::exception();
            }
        }
    }

    void stop()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_run = false;
    }

    const int tidx;
    const bool fail;

private:
    std::mutex m_mutex;
    bool m_run;
};

// Data shared between all threads
struct SharedData
{
    std::mutex mutex;
    std::condition_variable condVar;
    bool error;
    int running;

    SharedData(int count)
    :   error(false)
    ,   running(count)
    {

    }
};

void threadFunc(Task &task, SharedData &shared)
{
    try {
        out() << "thread " << task.tidx << " starting";

        task.run(); // Blocks until task completes or is aborted by main thread

        out() << "thread " << task.tidx << " ended";
    } catch (std::exception &) {
        out() << "thread " << task.tidx << " failed";

        std::unique_lock<std::mutex> lock(shared.mutex);
        shared.error = true;
    }

    {
        std::unique_lock<std::mutex> lock(shared.mutex);
        --shared.running;
    }

    shared.condVar.notify_one();
}

int main(int argc, char **argv)
{
    static const int NUM_THREADS = 3;

    std::vector<std::unique_ptr<Task>> tasks(NUM_THREADS);
    std::vector<std::thread> threads(NUM_THREADS);

    SharedData shared(NUM_THREADS);

    for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
        const bool fail = (tidx == 1);
        tasks[tidx] = std::make_unique<Task>(tidx, fail);
        threads[tidx] = std::thread(&threadFunc, std::ref(*tasks[tidx]), std::ref(shared));
    }

    {
        std::unique_lock<std::mutex> lock(shared.mutex);

        // Wake up when either all tasks have completed, or any one has failed
        shared.condVar.wait(lock, [&shared](){
            return shared.error || !shared.running;
        });

        if (shared.error) {
            out() << "error occurred - terminating remaining tasks";
            for (auto &t : tasks) {
                t->stop();
            }
        }
    }

    for (int tidx = 0; tidx < NUM_THREADS; ++tidx) {
        out() << "waiting for thread " << tidx << " to join";
        threads[tidx].join();
        out() << "thread " << tidx << " joined";
    }

    out() << "program complete";

    return 0;
}

Some utility functions are defined here:

// utils.h

#include <iostream>
#include <mutex>
#include <thread>

#ifndef UTILS_H
#define UTILS_H

#if __cplusplus <= 201103L
// Backport std::make_unique from C++14
#include <memory>
namespace std {

template<typename T, typename ...Args>
std::unique_ptr<T> make_unique(
            Args&& ...args)
{
    return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}

} // namespace std
#endif // __cplusplus <= 201103L

// Thread-safe wrapper around std::cout
class ThreadSafeStdOut
{
public:
    ThreadSafeStdOut()
    :   m_lock(m_mutex)
    {

    }

    ~ThreadSafeStdOut()
    {
        std::cout << std::endl;
    }

    template <typename T>
    ThreadSafeStdOut &operator<<(const T &obj)
    {
        std::cout << obj;
        return *this;
    }

private:
    static std::mutex m_mutex;
    std::unique_lock<std::mutex> m_lock;
};

std::mutex ThreadSafeStdOut::m_mutex;

// Convenience function for performing thread-safe output
ThreadSafeStdOut out()
{
    return ThreadSafeStdOut();
}

#endif // UTILS_H
like image 203
Gareth Stockwell Avatar asked Aug 27 '15 10:08

Gareth Stockwell


People also ask

How do I stop a worker thread?

terminate() The terminate() method of the Worker interface immediately terminates the Worker .

How do I terminate a node js job?

You can signal exit to the worker thread via worker. postMessage(value) / parentPort. on("message", (value) => {...}) , and then use process. exit() in the worker thread.

What is worker_threads?

The node:worker_threads module enables the use of threads that execute JavaScript in parallel. To access it: const worker = require('node:worker_threads'); Workers (threads) are useful for performing CPU-intensive JavaScript operations. They do not help much with I/O-intensive work.

When should I use node workers?

Workers are useful for performing CPU-intensive JavaScript operations; do not use them for I/O, since Node. js's built-in mechanisms for performing operations asynchronously already treat it more efficiently than Worker threads can.


2 Answers

I've been thinking about your situation for sometime and this maybe of some help to you. You could probably try doing a couple of different methods to achieve you goal. There are 2-3 options that maybe of use or a combination of all three. I will at minimum show the first option for I'm still learning and trying to master the concepts of Template Specializations as well as using Lambdas.

  • Using a Manager Class
  • Using Template Specialization Encapsulation
  • Using Lambdas.

Pseudo code of a Manager Class would look something like this:

class ThreadManager { private:     std::unique_ptr<MainThread> mainThread_;     std::list<std::shared_ptr<WorkerThread> lWorkers_;  // List to hold finished workers     std::queue<std::shared_ptr<WorkerThread> qWorkers_; // Queue to hold inactive and waiting threads.     std::map<unsigned, std::shared_ptr<WorkerThread> mThreadIds_; // Map to associate a WorkerThread with an ID value.     std::map<unsigned, bool> mFinishedThreads_; // A map to keep track of finished and unfinished threads.      bool threadError_; // Not needed if using exception handling public:     explicit ThreadManager( const MainThread& main_thread );      void shutdownThread( const unsigned& threadId );     void shutdownAllThreads();      void addWorker( const WorkerThread& worker_thread );               bool isThreadDone( const unsigned& threadId );      void spawnMainThread() const; // Method to start main thread's work.      void spawnWorkerThread( unsigned threadId, bool& error );      bool getThreadError( unsigned& threadID ); // Returns True If Thread Encountered An Error and passes the ID of that thread,   }; 

Only for demonstration purposes did I use bool value to determine if a thread failed for simplicity of the structure, and of course this can be substituted to your like if you prefer to use exceptions or invalid unsigned values, etc.

Now to use a class of this sort would be something like this: Also note that a class of this type would be considered better if it was a Singleton type object since you wouldn't want more than 1 ManagerClass since you are working with shared pointers.

SomeClass::SomeClass( ... ) {     // This class could contain a private static smart pointer of this Manager Class     // Initialize the smart pointer giving it new memory for the Manager Class and by passing it a pointer of the Main Thread object     threadManager_ = new ThreadManager( main_thread ); // Wouldn't actually use raw pointers here unless if you had a need to, but just shown for simplicity        }  SomeClass::addThreads( ... ) {     for ( unsigned u = 1, u <= threadCount; u++ ) {          threadManager_->addWorker( some_worker_thread );     } }  SomeClass::someFunctionThatSpawnsThreads( ... ) {     threadManager_->spawnMainThread();      bool error = false;            for ( unsigned u = 1; u <= threadCount; u++ ) {         threadManager_->spawnWorkerThread( u, error );          if ( error ) { // This Thread Failed To Start, Shutdown All Threads             threadManager->shutdownAllThreads();         }     }      // If all threads spawn successfully we can do a while loop here to listen if one fails.     unsigned threadId;     while ( threadManager_->getThreadError( threadId ) ) {          // If the function passed to this while loop returns true and we end up here, it will pass the id value of the failed thread.          // We can now go through a for loop and stop all active threads.          for ( unsigned u = threadID + 1; u <= threadCount; u++ ) {              threadManager_->shutdownThread( u );          }           // We have successfully shutdown all threads          break;     } } 

I like the design of manager class since I have used them in other projects, and they come in handy quite often especially when working with a code base that contains many and multiple resources such as a working Game Engine that has many assets such as Sprites, Textures, Audio Files, Maps, Game Items etc. Using a Manager Class helps to keep track and maintain all of the assets. This same concept can be applied to "Managing" Active, Inactive, Waiting Threads, and knows how to intuitively handle and shutdown all threads properly. I would recommend using an ExceptionHandler if your code base and libraries support exceptions as well as thread safe exception handling instead of passing and using bools for errors. Also having a Logger class is good to where it can write to a log file and or a console window to give an explicit message of what function the exception was thrown in and what caused the exception where a log message might look like this:

Exception Thrown: someFunctionNamedThis in ThisFile on Line# (x)     threadID 021342 failed to execute. 

This way you can look at the log file and find out very quickly what thread is causing the exception, instead of using passed around bool variables.

like image 93
Francis Cugler Avatar answered Sep 16 '22 18:09

Francis Cugler


The implementation of the long-running task is provided by a library whose code I cannot modify.

That means you have no way to synchronize the job done by working threads

If an error occurs in one of the workers,

Let's suppose that you can really detect worker errors; some of then can be easily detected if reported by the used library others cannot i.e.

  1. the library code loops.
  2. the library code prematurely exit with an uncaught exception.

I want the remaining workers to stop **gracefully**

That's just not possible

The best you can do is writing a thread manager checking on worker thread status and if an error condition is detected it just (ungracefully) "kills" all the worker threads and exits.

You should also consider detecting a looped working thread (by timeout) and offer to the user the option to kill or continue waiting for the process to finish.

like image 38
Pat Avatar answered Sep 20 '22 18:09

Pat