Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Signaling main thread when std::future is ready to be retrieved

I'm trying to understand the std::async, std::future system. What I don't quite understand is how you deal with running multiple async "tasks", and then, based on what returns first, second, etc, running some additional code.

Example: Let's say your main thread is in a simple loop. Now, based on user input, you run several functions via std::async, and save the futures in a std::list.

My issue is, how do I pass information back from the std::async function that can specify which future is complete?

My main thread is basically in a message loop, and what I need to do is have a function run by std::async be able to queue a message that somehow specifies which future is complete. The issue is that the function doesn't have access to the future.

Am I just missing something?

Here is some pseudo-code of what I'm trying to accomplish; extra points if there is a way to also have a way to have a way to make a call to "cancel" the request using a cancelation token.

class RequestA
{
public:
    int input1;

    int output1;
};

main()
{
    while(1)
    {
       //check for completion
       // i.e. pop next "message"
       if(auto *completed_task = get_next_completed_task())
       {
          completed_task->run_continuation();
       }

       // other code to handle user input
       if(userSaidRunA())
       {
          // note that I don't want to use a raw pointer but
          // am not sure how to use future for this
          RequestA *a = new RequestA();
          run(a, OnRequestTypeAComplete);
       }

    }
}

void OnRequestTypeAComplete(RequestA &req)
{
    // Do stuff with req, want access to inputs and output
}
like image 935
bpeikes Avatar asked Aug 24 '17 02:08

bpeikes


3 Answers

Unfortunately C++11 std::future doesn't provide continuations and cancellations. You can retrieve result from std::future only once. Moreover future returned from std::async blocks in its destructor. There is a group headed by Sean Parent from Adobe. They implemented future, async, task as it should be. Also functions with continuation like when_all, when_any. Could be it is what you're looking for. Anyway have a look at this project. Code has good quality and can be read easily.

If platform dependent solution are also ok for you you can check them. For windows I know PPL library. It also has primitives with cancellation and continuation.

like image 129
Viktor Avatar answered Nov 16 '22 11:11

Viktor


You can create a struct containing a flag and pass a reference to that flag to your thread function.

Something a bit like this:

int stuff(std::atomic_bool& complete, std::size_t id)
{
    std::cout << "starting: " << id << '\n';

    // do stuff
    std::this_thread::sleep_for(std::chrono::milliseconds(hol::random_number(3000)));

    // generate value
    int value = hol::random_number(30);

    // signal end
    complete = true;
    std::cout << "ended: " << id << " -> " << value << '\n';

    return value;
}

struct task
{
    std::future<int> fut;
    std::atomic_bool complete;

    task() = default;
    task(task&& t): fut(std::move(t.fut)), complete(t.complete.load()) {}
};

int main()
{
    // list of tasks
    std::vector<task> tasks;

    // reserve enough spaces so that nothing gets reallocated
    // as that would invalidate the references to the atomic_bools
    // needed to signal the end of a thread
    tasks.reserve(3);

    // create a new task
    tasks.emplace_back();

    // start it running
    tasks.back().fut = std::async(std::launch::async, stuff, std::ref(tasks.back().complete), tasks.size());

    tasks.emplace_back();
    tasks.back().fut = std::async(std::launch::async, stuff, std::ref(tasks.back().complete), tasks.size());

    tasks.emplace_back();
    tasks.back().fut = std::async(std::launch::async, stuff, std::ref(tasks.back().complete), tasks.size());

    // Keep going as long as any of the tasks is incomplete
    while(std::any_of(std::begin(tasks), std::end(tasks),
        [](auto& t){ return !t.complete.load(); }))
    {

        // do some parallel stuff
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }

    // process the results

    int sum = 0;
    for(auto&& t: tasks)
        sum += t.fut.get();

    std::cout << "sum: " << sum << '\n';
}
like image 25
Galik Avatar answered Nov 16 '22 11:11

Galik


Here a solution with a std::unordered_map instead of a std::list in which you don't need to modify your callables. Instead of that, you use a helper function that assigns an id to each task and notify when they finish:

class Tasks {
public:
    /*
     *  Helper to create the tasks in a safe way.
     *  lockTaskCreation is needed to guarantee newTask is (temporarilly)
     *  assigned before it is moved to the list of tasks
     */
    template <class R, class ...Args>
    void createNewTask(const std::function<R(Args...)>& f, Args... args) {
        std::unique_lock<std::mutex> lock(mutex);
        std::lock_guard<std::mutex> lockTaskCreation(mutexTaskCreation);
        newTask = std::async(std::launch::async, executeAndNotify<R, Args...>,
            std::move(lock), f, std::forward<Args>(args)...);
    }

private:
    /*
     *  Assign an id to the task, execute it, and notify when finishes
     */
    template <class R, class ...Args>
    static R executeAndNotify(std::unique_lock<std::mutex> lock,
        const std::function<R(Args...)>& f, Args... args)
    {
        {
            std::lock_guard<std::mutex> lockTaskCreation(mutexTaskCreation);
            tasks[std::this_thread::get_id()] = std::move(newTask);
        }
        lock.unlock();
        Notifier notifier;
        return f(std::forward<Args>(args)...);
    }

    /*
     *  Class to notify when a task is completed (follows RAII)
     */
    class Notifier {
    public:
        ~Notifier() {
            std::lock_guard<std::mutex> lock(mutex);
            finishedTasks.push(std::this_thread::get_id());
            cv.notify_one();
        }
    };

    /*
     *  Wait for a finished task.
     *  This function needs to be called in an infinite loop
     */
    static void waitForFinishedTask() {
        std::unique_lock<std::mutex> lock(mutex);
        cv.wait(lock, [] { return finishedTasks.size() || finish; });
        if (finishedTasks.size()) {
            auto threadId = finishedTasks.front();
            finishedTasks.pop();
            auto result = tasks.at(threadId).get();
            tasks.erase(threadId);
            std::cout << "task " << threadId
                << " returned: " << result << std::endl;
        }
    }

    static std::unordered_map<std::thread::id, std::future<int>> tasks;
    static std::mutex mutex;
    static std::mutex mutexTaskCreation;
    static std::queue<std::thread::id> finishedTasks;
    static std::condition_variable cv;
    static std::future<int> newTask;

    ...
};

...

Then, you can call an async task in this way:

int doSomething(int i) {
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    return i;
}

int main() {
    Tasks tasks;
    tasks.createNewTask(std::function<decltype(doSomething)>(doSomething), 10);
    return 0;
}

See a complete implementation run on Coliru

like image 2
whoan Avatar answered Nov 16 '22 11:11

whoan