I'm trying to create a generic (general) pipeline in C# to be reused across many projects. The idea is very similar to ASP.NET Core Middleware. It's more like a huge function (bi-directional pipeline) that can be composed dynamically (similar to BRE)
It needs to get an input model, manage a series of processors which have been loaded previously, and return an output model wrapped in a supermodel alongside the input.
Here's what I've done. I've created a Context class, that represents the overall data/model:
public class Context<InputType, OutputType> where InputType : class, new() where OutputType : class, new()
{
public Context()
{
UniqueToken = new Guid();
Logs = new List<string>();
}
public InputType Input { get; set; }
public OutputType Output { get; set; }
public Guid UniqueToken { get; }
public DateTime ProcessStartedAt { get; set; }
public DateTime ProcessEndedAt { get; set; }
public long ProcessTimeInMilliseconds
{
get
{
return (long)ProcessEndedAt.Subtract(ProcessStartedAt).TotalMilliseconds;
}
}
public List<string> Logs { get; set; }
}
Then I created an interface, to enforce signature on real processors:
public interface IProcessor
{
void Process<InputType, OutputType>(Context<InputType, OutputType> context, IProcessor next) where InputType : class, new() where OutputType : class, new();
}
Then I created a Container, to manage the whole pipeline:
public class Container<InputType, OutputType> where InputType : class, new() where OutputType : class, new()
{
public static List<IProcessor> Processors { get; set; }
public static void Initialize()
{
LoadProcessors();
}
private static void LoadProcessors()
{
// loading processors from assemblies dynamically
}
public static Context<InputType, OutputType> Execute(InputType input)
{
if (Processors.Count == 0)
{
throw new FrameworkException("No processor is found to be executed");
}
if (input.IsNull())
{
throw new BusinessException($"{nameof(InputType)} is not provided for processing pipeline");
}
var message = new Context<InputType, OutputType>();
message.Input = input;
message.ProcessStartedAt = DateTime.Now;
Processors[0].Process(message, Processors[1]);
message.ProcessEndedAt = DateTime.Now;
return message;
}
}
I know how to load processors dynamically from assemblies in a given folder, so that's not a problem. But I'm stuck at these points:
Next property on each processor, but I guess it's against SRP, since each processor should only care about doing its job, not keeping the chain)Order property in each processor, and make sure they don't have duplicate values, but it seems to be the violation of SRP, each processor should only care about processing, not about its order)I would suggest a slightly different design. The idea is based on the decorator pattern.
First, I would make Context a non-generic class and remove input and output values. In my design, the context only holds contextual information (like the processing time and messages):
public class Context
{
public Context()
{
UniqueToken = new Guid();
Logs = new List<string>();
}
public Guid UniqueToken { get; }
public DateTime ProcessStartedAt { get; set; }
public DateTime ProcessEndedAt { get; set; }
public long ProcessTimeInMilliseconds
{
get
{
return (long)ProcessEndedAt.Subtract(ProcessStartedAt).TotalMilliseconds;
}
}
public List<string> Logs { get; set; }
}
Then, I would make the processor interface generic:
public interface IProcessor<InputType, OutputType>
{
OutputType Process(InputType input, Context context);
}
Then I turned your Container into a Pipeline with generic type arguments:
public interface IPipeline<InputType, OutputType>
{
OutputType Execute(InputType input, out Context context);
OutputType ExecuteSubPipeline(InputType input, Context context);
}
The difference between the two function is that the former initializes the context and the latter only uses it. You may want to split this into a public and an internal interface if you don't want your clients to access ExecuteSubPipeline().
The idea then is to encapsulate multiple pipeline objects within each other that have more and more processors. You start with a pipeline object with only a single processor. Than you wrap it in another pipeline object and so on. For this, I started with an abstract base class. This base class is associated with a processor and has a function AppendProcessor() that creates a new pipeline with a given processor added:
public abstract class PipelineBase<InputType, ProcessorInputType, OutputType> : IPipeline<InputType, OutputType>
{
protected IProcessor<ProcessorInputType, OutputType> currentProcessor;
public PipelineBase(IProcessor<ProcessorInputType, OutputType> processor)
{
currentProcessor = processor;
}
public IPipeline<InputType, ProcessorOutputType> AppendProcessor<ProcessorOutputType>(IProcessor<OutputType, ProcessorOutputType> processor)
{
return new Pipeline<InputType, OutputType, ProcessorOutputType>(processor, this);
}
public OutputType Execute(InputType input, out Context context)
{
context = new Context();
context.ProcessStartedAt = DateTime.Now;
var result = ExecuteSubPipeline(input, context);
context.ProcessEndedAt = DateTime.Now;
return result;
}
public abstract OutputType ExecuteSubPipeline(InputType input, Context context);
}
Now, we have two concrete implementations of this pipeline: One terminal implementation that is the start point of any pipeline and one wrapper pipeline:
public class TerminalPipeline<InputType, OutputType> : PipelineBase<InputType, InputType, OutputType>
{
public TerminalPipeline(IProcessor<InputType, OutputType> processor)
:base(processor)
{ }
public override OutputType ExecuteSubPipeline(InputType input, Context context)
{
return currentProcessor.Process(input, context);
}
}
public class Pipeline<InputType, ProcessorInputType, OutputType> : PipelineBase<InputType, ProcessorInputType, OutputType>
{
IPipeline<InputType, ProcessorInputType> previousPipeline;
public Pipeline(IProcessor<ProcessorInputType, OutputType> processor, IPipeline<InputType, ProcessorInputType> previousPipeline)
: base(processor)
{
this.previousPipeline = previousPipeline;
}
public override OutputType ExecuteSubPipeline(InputType input, Context context)
{
var previousPipelineResult = previousPipeline.ExecuteSubPipeline(input, context);
return currentProcessor.Process(previousPipelineResult, context);
}
}
For ease of use, let's also create a helper function that creates a terminal start pipeline (to allow type argument deduction):
public static class Pipeline
{
public static TerminalPipeline<InputType, OutputType> Create<InputType, OutputType>(IProcessor<InputType, OutputType> processor)
{
return new TerminalPipeline<InputType, OutputType>(processor);
}
}
Then, we can use this structure with various processors. For example:
class FloatToStringProcessor : IProcessor<float, string>
{
public string Process(float input, Context context)
{
return input.ToString();
}
}
class RepeatStringProcessor : IProcessor<string, string>
{
public string Process(string input, Context context)
{
return input + input + input;
}
}
class Program
{
public static void Main()
{
var pipeline = Pipeline
.Create(new FloatToStringProcessor())
.AppendProcessor(new RepeatStringProcessor());
Context ctx;
var result = pipeline.Execute(5, out ctx);
Console.WriteLine($"Pipeline result: {result}");
Console.WriteLine($"Pipeline execution took {ctx.ProcessTimeInMilliseconds} milliseconds");
}
}
This will print
Pipeline result: 555
Pipeline execution took 6 milliseconds
I don't understand what you mean by short-circuiting. In my opinion, short-circuiting makes only sense for (at least) binary operators where one operand does not need to be evaluated. But since your operators are all unary, this can not really be applied. The processor can always check the input and directly return it when it finds that no processing is necessary.
Dynamic loading can be added easily by adding something like LoadProcessors() to the IPipeline interface, similarly to ExecuteSubPipeline(). In this case, the processor objects must be representatives (which are still properly typed). Then, LoadProcessors() can replace them with their actual processors after loading them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With