Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow Blocks using LinkTo Predicate

I have some blocks that eventually go from a TransformBlock to one of three other transform blocks based on the LinkTo predicate. I am using DataflowLinkOptions to propagate the completion. The problem is that when a predicate is satisfied and that block is started the rest of my pipeline continues on. It would seem that the pipeline should wait for this block to finish first.

The code for this is something like this:

var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
mainBlock.LinkTo(block1, linkOptions, x => x.Status = Status.Complete);
mainBlock.LinkTo(block2, linkOptions, x => x.Status = Status.Cancelled);
mainBlock.LinkTo(block3, linkOptions, x => x.Status = Status.Delayed);
mainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions);

Now, this doesn't work as I'd expect as I said, so the only way Ive found to get the behavior that I want is to take the linkOptions out and add the following into the lambda for the mainBlock.

mainBlock = new TransformBlock<Thing,Thing>(input =>
{
    DoMyStuff(input);

    if (input.Status = Status.Complete)
    {
        mainBlock.Completion.ContinueWith(t => block1.Complete());
    }
    if (input.Status = Status.Cancelled)
    {
        mainBlock.Completion.ContinueWith(t => block2.Complete());
    }
    if (input.Status = Status.Delayed)
    {
        mainBlock.Completion.ContinueWith(t => block3.Complete());
    }

    return input;
});

So the question, is this the only way to get this to work?

BTW, this has been run in my unit test with a single data item running through it to try and debug the pipeline behavior. Each block has been tested individually with multiple unit tests. So what happens in my pipeline unit test is that the assert is hit before the block finished executing and so fails.

If I remove the block2 and block3 links and debug the test using the linkOptions it works fine.

like image 260
jmichas Avatar asked Mar 19 '23 13:03

jmichas


1 Answers

Your problem is not with the code in your question, that works correctly: when the main block completes, all the three followup blocks are marked for completion too.

The problem is with the end block: you're using PropagateCompletion there too, which means that when any of the three previous blocks completes, the end block is marked for completion. What you want is to mark it for completion when all three blocks complete and the Task.WhenAll().ContinueWith() combination from your answer does that (though the first part of that snippet is unnecessary, that does exactly the same thing PropagateCompletion would).

As it turns out, the link option propagation (at least this is my guess) will propagate the completion for blocks that don't satisfy the predicate in the linkTo.

Yes, it propagates completion always. Completion doesn't have any item associated with it, so it doesn't make any sense to apply the predicate to it. Maybe the fact that you always have only a single item (which is not common) makes this more confusing for you?

If my guess is correct I sort of feel like this is bug or design error in the link option completion propagation. Why should a block be complete if it was never used?

Why shouldn't it? To me, this makes perfect sense: even when there were no items with Status.Delayed this time around, you still want to complete the block that processes those items, so that any follow-up code can know that all delayed items were already processed. And the fact that there weren't any doesn't matter.


Anyway, if you encounter this often, you might want to create a helper method that links several source blocks to a single target block at the same time and propagates completion correctly:

public static void LinkTo<T>(
    this IReadOnlyCollection<ISourceBlock<T>> sources, ITargetBlock<T> target,
    bool propagateCompletion)
{
    foreach (var source in sources)
    {
        source.LinkTo(target);
    }

    if (propagateCompletion)
        Task.WhenAll(sources.Select(source => source.Completion))
            .ContinueWith(_ => target.Complete());
}

Usage:

new[] { block1, block2, block3 }.LinkTo(endBlock, propagateCompletion: true);
like image 151
svick Avatar answered Mar 24 '23 10:03

svick