I am implementing a work stealing algorithm and am writing a generic function wrapper that takes a promise as one of the variadic arguments to the wrapper template. I want to create tasks with these function wrappers and get each node to communicate to dependent nodes using promises. Each node maintains a list of dependent nodes and promises/futures. Each node can run by checking if all futures have been set. promises can vary depending on the job the function wrapper is doing returning different objects. If a single algorithm can be broken into separate operations like read message and decode message, perform checks on object, return result of all checks, each of these actions will return a different promise (object, boolean, result).
The book, C++ Concurrency in Action, has a function wrapper implementation however, that doesn't handle this use case. In other references online, I've seen hardcoded references to promises like std::promise which is only one type.
Can someone advise how I can write a wrapper to achieve the following...
void add(int a, int b, std::promise<int>&& prms)
{
int res = a + b;
try {
prms.set_value(res);
}
catch(...)
{
prms.set_exception(std::current_exception());
}
}
int main()
{
std::promise<int> prms;
std::future<int> fut = prms.get_future();
FunctionWrapper myFunctor(a, 10, 20, std::move(prms));
// add the functor to the queue and it will be retrieved by a thread
// that executes the task. since i have the future, i can pass it to the
// dependent worknode
}
I tried writing the code like below...but was facing difficulties in getting this to work.
#ifndef FUNCTIONWRAPPER_HPP
#define FUNCTIONWRAPPER_HPP
template<typename F, typename R, typename... Args>
class FunctionWrapper
{
class implbase
{
public:
virtual ~implbase();
virtual R execute(Args...)=0;
};
class impl : public implbase
{
public:
impl(F&& f) : func(std::move(f)) {}
virtual R execute(Args... args) { return func(args...); }
private:
F func;
};
std::shared_ptr<impl> internalFunc;
public:
FunctionWrapper(F&& f) : internalFunc(0)
{
internalFunc = new impl<F, R, Args...>(f);
}
FunctionWrapper(const FunctionWrapper& other)
: internalFunc(std::move(other.internalFunc))
{}
~FunctionWrapper()
{
if(internalFunc)
delete internalFunc;
}
R operator()(Args... args)
{
return internalFunc->execute(args...);
}
void swap(FunctionWrapper& other)
{
impl<R, Args...>* tmp = internalFunc;
internalFunc = other.internalFunc;
other.internalFunc = tmp;
}
FunctionWrapper& operator=(const FunctionWrapper& other)
{
FunctionWrapper(other).swap(*this);
return *this;
}
FunctionWrapper& operator=(const F& f)
{
FunctionWrapper(f).swap(*this);
return *this;
}
};
#endif // FUNCTIONWRAPPER_HPP
C++11 has a wrapper for doing just this! It is called packaged_task
.
What it does, is it wraps a callable object (function objects, lambdas, function pointers, bind expressions, etc...) and provides you a future via the get_future()
method that matches the return type of the function passed in.
Consider the following example:
#include <thread>
#include <future>
#include <functional>
#include <iostream>
using namespace std;
int add(int a, int b)
{
return a + b;
}
int main()
{
// Create a std::packaged_task and grab the future out of it.
packaged_task<int()> myTask(bind(add, 10, 20));
future<int> myFuture = myTask.get_future();
// Here, is where you would queue up the task in your example.
// I'll launch it on another thread just to demonstrate how.
thread myThread(std::move(myTask));
myThread.detach();
// myFuture.get() will block until the task completes.
// ...or throw if the task throws an exception.
cout << "The result is: " << myFuture.get() << endl;
return 0;
}
As you can see, rather than passing in a promise
, we are counting on the packaged_task
to create the promise and give us the future.
Also, using a bind expression has allowed us to effectively hand arguments to the task to hold onto until it is called.
Using packaged_task
also puts the burden pushing exceptions through the future into the hands of the packaged_task
. That way, your functions do not need to call set_exception()
. They only have to return or throw.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With