TPL Dataflow provides a very useful function:
public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
ITargetBlock<TInput> target,
ISourceBlock<TOutput> source)
to enable you to, well, encapsulate multiple blocks into a single transform block. It returns a
IPropagatorBlock<TInput, TOutput>
which represents the start and end blocks of your pipeline.
However, if the last block in my pipeline is an ActionBlock, I can't use this, since an ActionBlock is not a SourceBlock, and the return type of the function would be an ITargetBlock, not an IPropagatorBlock.
Essentially, what I'm looking for is something like this function:
public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
ITargetBlock<TStart> startBlock,
ActionBlock<TEnd> endBlock)
Is this a sensible thing to write, or am I missing something simple? I'm not quite sure how to write it - particularly wiring up the completion. Would I need to create my own custom block type?
EDIT:
Ok, so having read the response from @Panagiotis Kanavos, and done some tinkering, I've come up with this. This is based on the EncapsulatingPropagator class, which is what the existing DataflowBlock.Encapsulate method uses:
internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
private readonly ITargetBlock<TStart> startBlock;
private readonly ActionBlock<TEnd> endBlock;
public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
{
this.startBlock = startBlock;
this.endBlock = endBlock;
}
public Task Completion
{
get { return this.endBlock.Completion; }
}
public void Complete()
{
this.startBlock.Complete();
}
void IDataflowBlock.Fault(Exception exception)
{
if (exception == null)
{
throw new ArgumentNullException("exception");
}
this.startBlock.Fault(exception);
}
public DataflowMessageStatus OfferMessage(
DataflowMessageHeader messageHeader,
TStart messageValue,
ISourceBlock<TStart> source,
bool consumeToAccept)
{
return this.startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
}
}
Encapsulate isn't used to abstract an existing pipeline, it's used to create a propagator block that requires custom behaviour that can't be implemented using the existing blocks and links.
For example, the Sliding Window sample buffers all incoming messages posted to its input block and outputs a batch of all retrieved messages when a sliding window expires to its output block.
The names of the method create a lot of confusion but they do make sense when you understand their purpose:
An Encapsulate
variant that accepts an ActionBlock method as source
isn't useful because you can simply link from any previous step to an action block.
EDIT
If you want to modularize a pipeline, ie break it into reuseable, more manageable you can create a class that construct, you can use a plain old class. In that class, you build your pipeline fragment as normal, link the blocks (ensuring completion is propagated) and then expose the first step and the Completion task of the last step as public properties, eg:
class MyFragment
{
public TransformationBlock<SomeMessage,SomeOther> Input {get;}
public Task Completion {get;}
ActionBlock<SomeOther> _finalBlock;
public MyFragment()
{
Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
_finalBlock=new ActionBlock<SomeOther>(MyMethod);
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
Input.LinkTo(_finalBlock,linkOptions);
}
private SomeOther MyFunction(SomeMessage msg)
{
...
}
private void MyMethod(SomeOther msg)
{
...
}
}
To connect the fragment to a pipeline, you only need to link from a pipeline block to the exposed Input
block. To await completion, just await on the exposed Completion
task.
You could stop here if you want, or you can implement ITargetBlock to make the fragment look like a Target block. You just need to delegate all methods to the Input block and the Completion property to the final block.
Eg:
class MyFragment:ITargetBlock<SomeMessage>
{
....
public Task Completion {get;}
public void Complete()
{
Input.Complete()
};
public void Fault(Exception exc)
{
Input.Fault(exc);
}
DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept)
{
return Input.OfferMessage(messageHeader,messageValue,source,consumeToAccept);
}
}
EDIT 2
Using @bornfromanegg 's class one can separate the act of building the fragment from the boilerplate that exposes the Input and Completion:
public ITargetBlock<SomeMessage> BuildMyFragment()
{
var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2);
var finalBlock=new ActionBlock<SomeFinal>(MyMethod);
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
input.LinkTo(step2,linkOptions);
step2.LinkTo(finalBlock,linkOptions);
return new EncapsulatingTarget(input,finalBlock);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With