I'm new with Intel TBB library. As you can see my question is related to tbb::flow::graph. I need to implement logic like:
User draws graph with some logic blocks. Every block(node) could have unlimited connections(edges) so every block(node) could choose where to put data next. Then my program will build such graph with help of TBB library and perform calculations.
So I don't know if it is possible to construct node (I guess it has to be multifunction_node) with dynamic number of output ports. Could you show me the way to do it please?
Unfortunately there is no way (without dynamic compilation) to change the number of output ports in a multifunction_node. You can create the maximum number of ports (which is controlled by a macro switch and depends on the compiler), and just attach to the ports dynamically. If you do a try_put to a port and there is no successor attached, the try_put fails and you can react to this at runtime.
Another way to do it (albeit with some frustration, I think) is to build a binary tree of two-port multifunction_nodes. If you use a class with an output destination as a field, construct each node to react to one bit of the destination and output to port 0 or port 1, depending on the result of the mask. the scheduler short circuit would steer the output relatively quickly through the tree, but you'd pay a bit of a penalty for the multiple dynamic calls.
Or you could use some other base besides 2 (like, say, 10.)
Addendum: After talking with Mike (the designer of flow::graph), we realized there is another way to handle this, which would allow a dynamic number of ports. You would have to do a little low-level stuff, but it goes like this:
#include "tbb/tbb.h"
#include <iostream>
using namespace tbb::flow;
tbb::spin_mutex io_lock;
typedef broadcast_node<int> bnode_element_t;
typedef tbb::concurrent_vector<bnode_element_t *> output_port_vector_t;
struct multioutput_function_body {
output_port_vector_t &my_ports;
public:
multioutput_function_body(output_port_vector_t &_ports) : my_ports(_ports) {}
multioutput_function_body(const multioutput_function_body &other) : my_ports(other.my_ports) { }
continue_msg operator()(const int in) {
int current_size = my_ports.size();
if(in >= current_size) {
// error condition? grow concurrent_vector?
tbb::spin_mutex::scoped_lock gl(io_lock);
std::cout << "Received input out of range(" << in << ")" << std::endl;
}
else {
// do computation
my_ports[in]->try_put(in*2);
}
return continue_msg();
}
};
struct output_function_body {
int my_prefix;
output_function_body(int i) : my_prefix(i) { }
int operator()(const int i) {
tbb::spin_mutex::scoped_lock gl(io_lock);
std::cout << " output node "<< my_prefix << " received " << i << std::endl;
return i;
}
};
int main() {
graph g;
output_port_vector_t output_ports;
function_node<int> my_node(g, unlimited, multioutput_function_body(output_ports) );
// create broadcast_nodes
for( int i = 0; i < 20; ++i) {
bnode_element_t *bp = new bnode_element_t(g);
output_ports.push_back(bp);
}
// attach the output nodes to the broadcast_nodes
for(int i = 0; i < 20; ++i) {
function_node<int,int> *fp = new function_node<int,int>(g, unlimited, output_function_body(i));
make_edge(*(output_ports[i]),*fp);
}
for( int i = 0; i < 21; ++i) {
my_node.try_put(i);
}
g.wait_for_all();
return 0;
}
Notes on the above:
concurrent_vector
of pointers to broadcast_nodes
. The successors to the function_node
are attached to these broadcast_nodes
. The output of the function_node
is ignored.multioutput_function_body
. We don't need a multifunction_node at all in this case. The multioutput_function_body
decides which broadcast_node
to try_put
to at runtime. Note we are doing explicit try_puts
to the broadcast_nodes
. These result in a task being spawned for each try_put
. Spawned tasks are faster than enqueued tasks, but there is more scheduling overhead than just returning a value from a node.broadcast_nodes
and the output function_nodes
. The "obvious" place to do the deletion of the broadcast_nodes
would be in the destructor of multioutput_function_body
. You should not do this, as the creation of the function_node
results in the copy-construction of the passed-in function bodies, and multiple copies of the function_body
will have the reference to the concurrent_vector of broadcast_node
pointers. Do the deletion after the g.wait_for_all()
. I used concurrent_vector
because it allows the access to the pointers while the concurrent_vector
is being modified. The question of whether additional broadcast_node
pointers can be added during the execution of the graph is open. I hope you are only creating the nodes and using them as-is, not modifying them on-the-fly. concurrent_vectors
do not reallocate and move already-initialized elements when growing the structure; that is why I used it, but don't think this is a complete answer if you are hoping to add additional nodes while the graph is running.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With