I'm trying to understand the std::async
, std::future
system. What I don't quite understand is how you deal with running multiple async "tasks", and then, based on what returns first, second, etc, running some additional code.
Example: Let's say your main thread is in a simple loop. Now, based on user input, you run several functions via std::async
, and save the futures in a std::list
.
My issue is, how do I pass information back from the std::async
function that can specify which future is complete?
My main thread is basically in a message loop, and what I need to do is have a function run by std::async
be able to queue a message that somehow specifies which future is complete. The issue is that the function doesn't have access to the future.
Am I just missing something?
Here is some pseudo-code of what I'm trying to accomplish; extra points if there is a way to also have a way to have a way to make a call to "cancel" the request using a cancelation token.
class RequestA
{
public:
int input1;
int output1;
};
main()
{
while(1)
{
//check for completion
// i.e. pop next "message"
if(auto *completed_task = get_next_completed_task())
{
completed_task->run_continuation();
}
// other code to handle user input
if(userSaidRunA())
{
// note that I don't want to use a raw pointer but
// am not sure how to use future for this
RequestA *a = new RequestA();
run(a, OnRequestTypeAComplete);
}
}
}
void OnRequestTypeAComplete(RequestA &req)
{
// Do stuff with req, want access to inputs and output
}
Unfortunately C++11 std::future
doesn't provide continuations and cancellations. You can retrieve result from std::future
only once. Moreover future returned from std::async
blocks in its destructor. There is a group headed by Sean Parent from Adobe. They implemented future
, async
, task
as it should be. Also functions with continuation like when_all
, when_any
. Could be it is what you're looking for. Anyway have a look at this project. Code has good quality and can be read easily.
If platform dependent solution are also ok for you you can check them. For windows I know PPL library. It also has primitives with cancellation and continuation.
You can create a struct
containing a flag and pass a reference to that flag to your thread function.
Something a bit like this:
int stuff(std::atomic_bool& complete, std::size_t id)
{
std::cout << "starting: " << id << '\n';
// do stuff
std::this_thread::sleep_for(std::chrono::milliseconds(hol::random_number(3000)));
// generate value
int value = hol::random_number(30);
// signal end
complete = true;
std::cout << "ended: " << id << " -> " << value << '\n';
return value;
}
struct task
{
std::future<int> fut;
std::atomic_bool complete;
task() = default;
task(task&& t): fut(std::move(t.fut)), complete(t.complete.load()) {}
};
int main()
{
// list of tasks
std::vector<task> tasks;
// reserve enough spaces so that nothing gets reallocated
// as that would invalidate the references to the atomic_bools
// needed to signal the end of a thread
tasks.reserve(3);
// create a new task
tasks.emplace_back();
// start it running
tasks.back().fut = std::async(std::launch::async, stuff, std::ref(tasks.back().complete), tasks.size());
tasks.emplace_back();
tasks.back().fut = std::async(std::launch::async, stuff, std::ref(tasks.back().complete), tasks.size());
tasks.emplace_back();
tasks.back().fut = std::async(std::launch::async, stuff, std::ref(tasks.back().complete), tasks.size());
// Keep going as long as any of the tasks is incomplete
while(std::any_of(std::begin(tasks), std::end(tasks),
[](auto& t){ return !t.complete.load(); }))
{
// do some parallel stuff
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
// process the results
int sum = 0;
for(auto&& t: tasks)
sum += t.fut.get();
std::cout << "sum: " << sum << '\n';
}
Here a solution with a std::unordered_map
instead of a std::list
in which you don't need to modify your callables. Instead of that, you use a helper function that assigns an id to each task and notify when they finish:
class Tasks {
public:
/*
* Helper to create the tasks in a safe way.
* lockTaskCreation is needed to guarantee newTask is (temporarilly)
* assigned before it is moved to the list of tasks
*/
template <class R, class ...Args>
void createNewTask(const std::function<R(Args...)>& f, Args... args) {
std::unique_lock<std::mutex> lock(mutex);
std::lock_guard<std::mutex> lockTaskCreation(mutexTaskCreation);
newTask = std::async(std::launch::async, executeAndNotify<R, Args...>,
std::move(lock), f, std::forward<Args>(args)...);
}
private:
/*
* Assign an id to the task, execute it, and notify when finishes
*/
template <class R, class ...Args>
static R executeAndNotify(std::unique_lock<std::mutex> lock,
const std::function<R(Args...)>& f, Args... args)
{
{
std::lock_guard<std::mutex> lockTaskCreation(mutexTaskCreation);
tasks[std::this_thread::get_id()] = std::move(newTask);
}
lock.unlock();
Notifier notifier;
return f(std::forward<Args>(args)...);
}
/*
* Class to notify when a task is completed (follows RAII)
*/
class Notifier {
public:
~Notifier() {
std::lock_guard<std::mutex> lock(mutex);
finishedTasks.push(std::this_thread::get_id());
cv.notify_one();
}
};
/*
* Wait for a finished task.
* This function needs to be called in an infinite loop
*/
static void waitForFinishedTask() {
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [] { return finishedTasks.size() || finish; });
if (finishedTasks.size()) {
auto threadId = finishedTasks.front();
finishedTasks.pop();
auto result = tasks.at(threadId).get();
tasks.erase(threadId);
std::cout << "task " << threadId
<< " returned: " << result << std::endl;
}
}
static std::unordered_map<std::thread::id, std::future<int>> tasks;
static std::mutex mutex;
static std::mutex mutexTaskCreation;
static std::queue<std::thread::id> finishedTasks;
static std::condition_variable cv;
static std::future<int> newTask;
...
};
...
Then, you can call an async task in this way:
int doSomething(int i) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return i;
}
int main() {
Tasks tasks;
tasks.createNewTask(std::function<decltype(doSomething)>(doSomething), 10);
return 0;
}
See a complete implementation run on Coliru
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With