Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Use TPL Dataflow to encapsulate pipeline ending in an action block

TPL Dataflow provides a very useful function:

public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput>(
    ITargetBlock<TInput> target, 
    ISourceBlock<TOutput> source)

to enable you to, well, encapsulate multiple blocks into a single transform block. It returns a

IPropagatorBlock<TInput, TOutput>

which represents the start and end blocks of your pipeline.

However, if the last block in my pipeline is an ActionBlock, I can't use this, since an ActionBlock is not a SourceBlock, and the return type of the function would be an ITargetBlock, not an IPropagatorBlock.

Essentially, what I'm looking for is something like this function:

public static ITargetBlock<TStart> Encapsulate<TStart, TEnd>(
        ITargetBlock<TStart> startBlock, 
        ActionBlock<TEnd> endBlock)

Is this a sensible thing to write, or am I missing something simple? I'm not quite sure how to write it - particularly wiring up the completion. Would I need to create my own custom block type?

EDIT:

Ok, so having read the response from @Panagiotis Kanavos, and done some tinkering, I've come up with this. This is based on the EncapsulatingPropagator class, which is what the existing DataflowBlock.Encapsulate method uses:

internal sealed class EncapsulatingTarget<TStart, TEnd> : ITargetBlock<TStart>
{
        private readonly ITargetBlock<TStart> startBlock;

        private readonly ActionBlock<TEnd> endBlock;

        public EncapsulatingTarget(ITargetBlock<TStart> startBlock, ActionBlock<TEnd> endBlock)
        {
            this.startBlock = startBlock;
            this.endBlock = endBlock;
        }

        public Task Completion
        {
            get { return this.endBlock.Completion; }
        }

        public void Complete()
        {
            this.startBlock.Complete();
        }

        void IDataflowBlock.Fault(Exception exception)
        {
            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            this.startBlock.Fault(exception);
        }

        public DataflowMessageStatus OfferMessage(
            DataflowMessageHeader messageHeader, 
            TStart messageValue, 
            ISourceBlock<TStart> source, 
            bool consumeToAccept)
        {
            return this.startBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
        }
    }
like image 906
bornfromanegg Avatar asked Sep 08 '15 14:09

bornfromanegg


1 Answers

Encapsulate isn't used to abstract an existing pipeline, it's used to create a propagator block that requires custom behaviour that can't be implemented using the existing blocks and links.

For example, the Sliding Window sample buffers all incoming messages posted to its input block and outputs a batch of all retrieved messages when a sliding window expires to its output block.

The names of the method create a lot of confusion but they do make sense when you understand their purpose:

  • The target argument is the target (input) endpoint to which preceding blocks will connect to send messages. In this case, an ActionBlock that processes incoming messages and decides whether or not to post to the output (source) block makes sense.
  • The source argument is the source (output) endpoint to which succeeding steps will connect to receive messages. It doesn't make sense to use an ActionBlock as a source because it doesn't have any output.

An Encapsulate variant that accepts an ActionBlock method as source isn't useful because you can simply link from any previous step to an action block.

EDIT

If you want to modularize a pipeline, ie break it into reuseable, more manageable you can create a class that construct, you can use a plain old class. In that class, you build your pipeline fragment as normal, link the blocks (ensuring completion is propagated) and then expose the first step and the Completion task of the last step as public properties, eg:

class MyFragment
{
    public TransformationBlock<SomeMessage,SomeOther> Input {get;}

    public Task Completion {get;}

    ActionBlock<SomeOther> _finalBlock;

    public MyFragment()
    {
        Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
        _finalBlock=new ActionBlock<SomeOther>(MyMethod);
        var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}
        Input.LinkTo(_finalBlock,linkOptions);
    }

    private SomeOther MyFunction(SomeMessage msg)
    {
    ...
    }

    private void MyMethod(SomeOther msg)
    {
    ...
    }
}

To connect the fragment to a pipeline, you only need to link from a pipeline block to the exposed Input block. To await completion, just await on the exposed Completion task.

You could stop here if you want, or you can implement ITargetBlock to make the fragment look like a Target block. You just need to delegate all methods to the Input block and the Completion property to the final block.

Eg:

class MyFragment:ITargetBlock<SomeMessage> 
{
    ....

    public Task Completion {get;}

    public void Complete()
    {
        Input.Complete()
    };

    public void Fault(Exception exc)
    {
        Input.Fault(exc);
    }

    DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
        TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept)
    {
        return Input.OfferMessage(messageHeader,messageValue,source,consumeToAccept);
    }
}

EDIT 2

Using @bornfromanegg 's class one can separate the act of building the fragment from the boilerplate that exposes the Input and Completion:

public ITargetBlock<SomeMessage> BuildMyFragment()
{
    var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction);
    var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2);
    var finalBlock=new ActionBlock<SomeFinal>(MyMethod);

    var linkOptions = new DataflowLinkOptions {PropagateCompletion = true}

    input.LinkTo(step2,linkOptions);
    step2.LinkTo(finalBlock,linkOptions);

    return new EncapsulatingTarget(input,finalBlock);
}
like image 147
Panagiotis Kanavos Avatar answered Oct 18 '22 12:10

Panagiotis Kanavos