Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make Intel TBB multifunction_node with dynamic number of ports?

I'm new with Intel TBB library. As you can see my question is related to tbb::flow::graph. I need to implement logic like:

User draws graph with some logic blocks. Every block(node) could have unlimited connections(edges) so every block(node) could choose where to put data next. Then my program will build such graph with help of TBB library and perform calculations.

So I don't know if it is possible to construct node (I guess it has to be multifunction_node) with dynamic number of output ports. Could you show me the way to do it please?

like image 891
Max Pashkov Avatar asked Oct 27 '15 12:10

Max Pashkov


1 Answers

Unfortunately there is no way (without dynamic compilation) to change the number of output ports in a multifunction_node. You can create the maximum number of ports (which is controlled by a macro switch and depends on the compiler), and just attach to the ports dynamically. If you do a try_put to a port and there is no successor attached, the try_put fails and you can react to this at runtime.

Another way to do it (albeit with some frustration, I think) is to build a binary tree of two-port multifunction_nodes. If you use a class with an output destination as a field, construct each node to react to one bit of the destination and output to port 0 or port 1, depending on the result of the mask. the scheduler short circuit would steer the output relatively quickly through the tree, but you'd pay a bit of a penalty for the multiple dynamic calls.

Or you could use some other base besides 2 (like, say, 10.)

Addendum: After talking with Mike (the designer of flow::graph), we realized there is another way to handle this, which would allow a dynamic number of ports. You would have to do a little low-level stuff, but it goes like this:

#include "tbb/tbb.h"
#include <iostream>

using namespace tbb::flow;

tbb::spin_mutex io_lock;
typedef broadcast_node<int> bnode_element_t;
typedef tbb::concurrent_vector<bnode_element_t *> output_port_vector_t;
struct multioutput_function_body {
    output_port_vector_t &my_ports;
    public:
    multioutput_function_body(output_port_vector_t &_ports) : my_ports(_ports) {}
    multioutput_function_body(const multioutput_function_body &other) : my_ports(other.my_ports) { }
    continue_msg operator()(const int in) {
        int current_size = my_ports.size();
        if(in >= current_size) {
            // error condition?  grow concurrent_vector?
            tbb::spin_mutex::scoped_lock gl(io_lock);
            std::cout << "Received input out of range(" << in << ")" << std::endl;
        }
        else {
            // do computation
            my_ports[in]->try_put(in*2);
        }
        return continue_msg();
    }
};

struct output_function_body {
    int my_prefix;
    output_function_body(int i) : my_prefix(i) { }
    int operator()(const int i) {
        tbb::spin_mutex::scoped_lock gl(io_lock);
        std::cout << " output node "<< my_prefix << " received " << i << std::endl;
        return i;
    }
};

int main() {
    graph g;
    output_port_vector_t output_ports;
    function_node<int> my_node(g, unlimited, multioutput_function_body(output_ports) );
    // create broadcast_nodes
    for( int i = 0; i < 20; ++i) {
        bnode_element_t *bp = new bnode_element_t(g);
        output_ports.push_back(bp);
    }

    // attach the output nodes to the broadcast_nodes
    for(int i = 0; i < 20; ++i) {
        function_node<int,int> *fp = new function_node<int,int>(g, unlimited, output_function_body(i));
        make_edge(*(output_ports[i]),*fp);
    }

    for( int i = 0; i < 21; ++i) {
        my_node.try_put(i);
    }
    g.wait_for_all();
    return 0;
}

Notes on the above:

  • We are creating a concurrent_vector of pointers to broadcast_nodes. The successors to the function_node are attached to these broadcast_nodes. The output of the function_node is ignored.
  • The concurrent_vector is passed in to the constructor of the multioutput_function_body. We don't need a multifunction_node at all in this case. The multioutput_function_body decides which broadcast_node to try_put to at runtime. Note we are doing explicit try_puts to the broadcast_nodes. These result in a task being spawned for each try_put. Spawned tasks are faster than enqueued tasks, but there is more scheduling overhead than just returning a value from a node.
  • I didn't add the cleanup of the heap-allocated broadcast_nodes and the output function_nodes. The "obvious" place to do the deletion of the broadcast_nodes would be in the destructor of multioutput_function_body. You should not do this, as the creation of the function_node results in the copy-construction of the passed-in function bodies, and multiple copies of the function_body will have the reference to the concurrent_vector of broadcast_node pointers. Do the deletion after the g.wait_for_all().

I used concurrent_vector because it allows the access to the pointers while the concurrent_vector is being modified. The question of whether additional broadcast_node pointers can be added during the execution of the graph is open. I hope you are only creating the nodes and using them as-is, not modifying them on-the-fly. concurrent_vectors do not reallocate and move already-initialized elements when growing the structure; that is why I used it, but don't think this is a complete answer if you are hoping to add additional nodes while the graph is running.

like image 82
cahuson Avatar answered Sep 22 '22 17:09

cahuson