Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

generic c++11 function wrapper for task based parallelism

Tags:

c++

c++11

I am implementing a work stealing algorithm and am writing a generic function wrapper that takes a promise as one of the variadic arguments to the wrapper template. I want to create tasks with these function wrappers and get each node to communicate to dependent nodes using promises. Each node maintains a list of dependent nodes and promises/futures. Each node can run by checking if all futures have been set. promises can vary depending on the job the function wrapper is doing returning different objects. If a single algorithm can be broken into separate operations like read message and decode message, perform checks on object, return result of all checks, each of these actions will return a different promise (object, boolean, result).

The book, C++ Concurrency in Action, has a function wrapper implementation however, that doesn't handle this use case. In other references online, I've seen hardcoded references to promises like std::promise which is only one type.

Can someone advise how I can write a wrapper to achieve the following...

void add(int a, int b, std::promise<int>&& prms)
{
   int res = a + b;
   try {
      prms.set_value(res);
   }
   catch(...)
   {
      prms.set_exception(std::current_exception());
   }
}

int main()
{
   std::promise<int> prms;
   std::future<int> fut = prms.get_future();
   FunctionWrapper myFunctor(a, 10, 20, std::move(prms));

   // add the functor to the queue and it will be retrieved by a thread
   // that executes the task. since i have the future, i can pass it to the 
   // dependent worknode
}

I tried writing the code like below...but was facing difficulties in getting this to work.

#ifndef FUNCTIONWRAPPER_HPP
#define FUNCTIONWRAPPER_HPP

template<typename F, typename R, typename... Args>
class FunctionWrapper
{
  class implbase
  {
  public:
    virtual ~implbase();
    virtual R execute(Args...)=0;
  };

  class impl : public implbase
  {
  public:
    impl(F&& f) : func(std::move(f)) {}
    virtual R execute(Args... args) { return func(args...); }

  private:
    F func;
  };

  std::shared_ptr<impl> internalFunc;

public:
  FunctionWrapper(F&& f) : internalFunc(0)
  {
    internalFunc = new impl<F, R, Args...>(f);
  }

  FunctionWrapper(const FunctionWrapper& other)
    : internalFunc(std::move(other.internalFunc))
  {}

  ~FunctionWrapper()
  {
    if(internalFunc)
      delete internalFunc;
  }

  R operator()(Args... args)
  {
    return internalFunc->execute(args...);
  }

  void swap(FunctionWrapper& other)
  {
    impl<R, Args...>* tmp = internalFunc;
    internalFunc = other.internalFunc;
    other.internalFunc = tmp;
  }

  FunctionWrapper& operator=(const FunctionWrapper& other)
  {
    FunctionWrapper(other).swap(*this);
    return *this;
  }

  FunctionWrapper& operator=(const F& f)
  {
    FunctionWrapper(f).swap(*this);
    return *this;
  }
};

#endif // FUNCTIONWRAPPER_HPP
like image 377
rreiki Avatar asked Dec 21 '12 22:12

rreiki


1 Answers

C++11 has a wrapper for doing just this! It is called packaged_task.

What it does, is it wraps a callable object (function objects, lambdas, function pointers, bind expressions, etc...) and provides you a future via the get_future() method that matches the return type of the function passed in.

Consider the following example:

#include <thread>
#include <future>
#include <functional>
#include <iostream>

using namespace std;

int add(int a, int b)
{
    return a + b;
}

int main()
{
    // Create a std::packaged_task and grab the future out of it.
    packaged_task<int()> myTask(bind(add, 10, 20));
    future<int> myFuture = myTask.get_future();

    // Here, is where you would queue up the task in your example.
    // I'll launch it on another thread just to demonstrate how.
    thread myThread(std::move(myTask));
    myThread.detach();

    // myFuture.get() will block until the task completes.
    // ...or throw if the task throws an exception.
    cout << "The result is: " << myFuture.get() << endl;
    return 0;
}

As you can see, rather than passing in a promise, we are counting on the packaged_task to create the promise and give us the future.

Also, using a bind expression has allowed us to effectively hand arguments to the task to hold onto until it is called.

Using packaged_task also puts the burden pushing exceptions through the future into the hands of the packaged_task. That way, your functions do not need to call set_exception(). They only have to return or throw.

like image 84
Sean Cline Avatar answered Oct 31 '22 18:10

Sean Cline