I am trying to concoct a C++ data structure for modeling a simple N stage process where each stage can be replaced with a different function. One way is to use the OO approach and have an abstract base class with a virtual method for each stage; e.g.:
class Pipeline {
protected:
virtual void processA(const In& in, BType& B) = 0;
virtual void processB(const BType& B, BType& C) = 0;
virtual void processC(const CType& C, BType& D) = 0;
virtual void processD(const DType& D, Out& out) = 0;
public:
void process(const In& in, Out& out) {
Btype B;
processA(in, B);
Ctype C;
processB(B, C);
Btype D;
processC(C, D);
processD(D,out);
}
};
The problem with this approach, if each of the N stages can be interchanged with M processes you have N*M possible subclasses.
Another idea is to store function objects:
class Pipeline {
public:
std::function<void(const In& in, BType& B)> processA;
std::function<void(const In& B, CType& C)> processB;
std::function<void(const In& C, DType& D)> processC;
std::function<void(const In& D, Out& out)> processD;
void process(const In& in, Out& out) {
Btype B;
processA(in, B);
Ctype C;
processB(B, C);
Btype D;
processC(C, D);
processD(D,out);
}
};
The problem I am having with this approach is that the stages are not really independent and I would like a single object, in some cases, to store info concerning multiple stages.
Has anyone found a good data structure for a pipeline with replaceable parts? A bonus would be able to allow each stage to run concurrently.
Pointers to std function objects is a bad idea. They already can store pointers if needed.
I propose graphs.
A sink
is a consumer:
template<class...Ts>
struct sink : std::function<void(Ts...)> {
using std::function<void(Ts...)>::function;
};
A source is something that takes a consumer, and satisfies it:
template<class...Ts>
using source = sink<sink<Ts...>>;
A process is something that connects a producer to a consumer, possibly changing types:
template<class In, class Out>
using process = sink< source<In>, sink<Out> >;
Then we can define a pipeline operation:
template<class In, class Out>
sink<In> operator|( process< In, Out > a, sink< Out > b ){
return [a,b]( In in ){
a( [&in]( sink<In> s )mutable{ s(std::forward<In>(in)); }, b );
};
}
template<class In, class Out>
source<Out> operator|( source< In > a, process< In, Out > b ){
return [a,b]( sink<Out> out ){
b( a, out );
};
}
template<class In, class Mid, class Out>
process<In, Out> operator|( process<In, Mid> a, process<Mid, Out> b ){
return [a,b]( source<In> in, sink<Out> out ){
a( in, b|out ); // or b( in|a, out )
};
}
template<class...Ts>
sink<> operator|( source<Ts...> a, sink<Ts...> b ){
return[a,b]{ a(b); };
}
that should do it.
I assume that the state of the component pipeline elements is cheap to copy, so shared ptrs or raw pointers or somesuch.
If you want concurrency, simply spin up processes that provide queues of values and pass futures through the pipeline. But I think usually it is best to attach elements together and make the pipeline async, instead of stages.
Having the pipeline elements be things like gsl spans is also useful, allowing stages to have fixed buffers and pass results of computation in chunks without allocating.
A toy process to get you started:
process<char, char> to_upper = []( source<char> in, sink<char> out ){
in( [&out]( char c ) { out( std::toupper(c) ); } );
};
and a source:
source<char> hello_world = [ptr="hello world"]( sink<char> s ){
for (auto it = ptr; *it; ++it ){ s(*it); }
};
sink<char> print = [](char c){std::cout<<c;};
int main(){
auto prog = hello_world|to_upper|print;
prog();
}
outputs "HELLO WORLD"
.
live demo: https://ideone.com/MC4fDV
Note that this is a push based pipeline. A pull based pipeline is an alternative. Push pipelines allow easier job batching; pull pipelines can avoid making data that nobody wants. Push makes data spreading natural; pull makes data gathering natural.
Coroutines can also make this more natural. In a sense, the source is a coroutine that suspends when it calls the sink in a push pipeline. And in a pull the other way around. Coroutines may make push/pull to both work eitb the same processing code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With