I have some blocks that eventually go from a TransformBlock to one of three other transform blocks based on the LinkTo predicate. I am using DataflowLinkOptions to propagate the completion. The problem is that when a predicate is satisfied and that block is started the rest of my pipeline continues on. It would seem that the pipeline should wait for this block to finish first.
The code for this is something like this:
var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
mainBlock.LinkTo(block1, linkOptions, x => x.Status = Status.Complete);
mainBlock.LinkTo(block2, linkOptions, x => x.Status = Status.Cancelled);
mainBlock.LinkTo(block3, linkOptions, x => x.Status = Status.Delayed);
mainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions);
Now, this doesn't work as I'd expect as I said, so the only way Ive found to get the behavior that I want is to take the linkOptions out and add the following into the lambda for the mainBlock.
mainBlock = new TransformBlock<Thing,Thing>(input =>
{
DoMyStuff(input);
if (input.Status = Status.Complete)
{
mainBlock.Completion.ContinueWith(t => block1.Complete());
}
if (input.Status = Status.Cancelled)
{
mainBlock.Completion.ContinueWith(t => block2.Complete());
}
if (input.Status = Status.Delayed)
{
mainBlock.Completion.ContinueWith(t => block3.Complete());
}
return input;
});
So the question, is this the only way to get this to work?
BTW, this has been run in my unit test with a single data item running through it to try and debug the pipeline behavior. Each block has been tested individually with multiple unit tests. So what happens in my pipeline unit test is that the assert is hit before the block finished executing and so fails.
If I remove the block2 and block3 links and debug the test using the linkOptions it works fine.
Your problem is not with the code in your question, that works correctly: when the main block completes, all the three followup blocks are marked for completion too.
The problem is with the end block: you're using PropagateCompletion
there too, which means that when any of the three previous blocks completes, the end block is marked for completion. What you want is to mark it for completion when all three blocks complete and the Task.WhenAll().ContinueWith()
combination from your answer does that (though the first part of that snippet is unnecessary, that does exactly the same thing PropagateCompletion
would).
As it turns out, the link option propagation (at least this is my guess) will propagate the completion for blocks that don't satisfy the predicate in the linkTo.
Yes, it propagates completion always. Completion doesn't have any item associated with it, so it doesn't make any sense to apply the predicate to it. Maybe the fact that you always have only a single item (which is not common) makes this more confusing for you?
If my guess is correct I sort of feel like this is bug or design error in the link option completion propagation. Why should a block be complete if it was never used?
Why shouldn't it? To me, this makes perfect sense: even when there were no items with Status.Delayed
this time around, you still want to complete the block that processes those items, so that any follow-up code can know that all delayed items were already processed. And the fact that there weren't any doesn't matter.
Anyway, if you encounter this often, you might want to create a helper method that links several source blocks to a single target block at the same time and propagates completion correctly:
public static void LinkTo<T>(
this IReadOnlyCollection<ISourceBlock<T>> sources, ITargetBlock<T> target,
bool propagateCompletion)
{
foreach (var source in sources)
{
source.LinkTo(target);
}
if (propagateCompletion)
Task.WhenAll(sources.Select(source => source.Completion))
.ContinueWith(_ => target.Complete());
}
Usage:
new[] { block1, block2, block3 }.LinkTo(endBlock, propagateCompletion: true);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With