How can I re-write the code that the code completes when BOTH transformblocks completed? I thought completion means that it is marked complete AND the " out queue" is empty?
public Test() { broadCastBlock = new BroadcastBlock<int>(i => { return i; }); transformBlock1 = new TransformBlock<int, string>(i => { Console.WriteLine("1 input count: " + transformBlock1.InputCount); Thread.Sleep(50); return ("1_" + i); }); transformBlock2 = new TransformBlock<int, string>(i => { Console.WriteLine("2 input count: " + transformBlock1.InputCount); Thread.Sleep(20); return ("2_" + i); }); processorBlock = new ActionBlock<string>(i => { Console.WriteLine(i); }); //Linking broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); } public void Start() { const int numElements = 100; for (int i = 1; i <= numElements; i++) { broadCastBlock.SendAsync(i); } //mark completion broadCastBlock.Complete(); processorBlock.Completion.Wait(); Console.WriteLine("Finished"); Console.ReadLine(); } }
I edited the code, adding an input buffer count for each transform block. Clearly all 100 items are streamed to each of the transform blocks. But as soon as one of the transformblocks finishes the processorblock does not accept any more items and instead the input buffer of the incomplete transformblock just flushes the input buffer.
TPL Dataflow is a data processing library from Microsoft that came out years ago. It consists of different "blocks" that you compose together to make a pipeline. Blocks correspond to stages in your pipeline. If you didn't read the first post in the series then that might not be a bad idea before you read on.
The issue is exactly what casperOne said in his answer. Once the first transform block completes, the processor block goes into “finishing mode”: it will process remaining items in its input queue, but it won't accept any new items.
There is a simpler fix than splitting your processor block in two though: don't set PropagateCompletion
, but instead set completion of the processor block manually when both transform blocks complete:
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion) .ContinueWith(_ => processorBlock.Complete());
The issue here is that you are setting the PropagateCompletion
property each time you call the LinkTo
method to link the blocks and the different in wait times in your transformation blocks.
From the documentation for the Complete
method on the IDataflowBlock
interface (emphasis mine):
Signals to the IDataflowBlock that it should not accept nor produce any more messages nor consume any more postponed messages.
Because you stagger out your wait times in each of the TransformBlock<TInput, TOutput>
instances, transformBlock2
(waiting for 20 ms) is finished before transformBlock1
(waiting for 50 ms). transformBlock2
completes first, and then sends the signal to processorBlock
which then says "I'm not accepting anything else" (and transformBlock1
hasn't produced all of its messages yet).
Note that the processing of transformBlock1
before transformBlock1
is not absolutely guaranteed; it's feasible that the thread pool (assuming you're using the default scheduler) will process the tasks in a different order (but more than likely will not, as it will steal work from the queues once the 20 ms items are done).
Your pipeline looks like this:
broadcastBlock / \ transformBlock1 transformBlock2 \ / processorBlock
In order to get around this, you want to have a pipeline that looks like this:
broadcastBlock / \ transformBlock1 transformBlock2 | | processorBlock1 processorBlock2
Which is accomplished by just creating two separate ActionBlock<TInput>
instances, like so:
// The action, can be a method, makes it easier to share. Action<string> a = i => Console.WriteLine(i); // Create the processor blocks. processorBlock1 = new ActionBlock<string>(a); processorBlock2 = new ActionBlock<string>(a); // Linking broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); transformBlock1.LinkTo(processorBlock1, new DataflowLinkOptions { PropagateCompletion = true }); transformBlock2.LinkTo(processorBlock2, new DataflowLinkOptions { PropagateCompletion = true });
You then need to wait on both processor blocks instead of just one:
Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();
A very important note here; when creating an ActionBlock<TInput>
, the default is to have the MaxDegreeOfParallelism
property on the ExecutionDataflowBlockOptions
instance passed to it set to one.
This means that the calls to the Action<T>
delegate that you pass to the ActionBlock<TInput>
are thread-safe, only one will execute at a time.
Because you now have two ActionBlock<TInput>
instances pointing to the same Action<T>
delegate, you aren't guaranteed thread-safety.
If your method is thread-safe, then you don't have to do anything (which would allow you to set the MaxDegreeOfParallelism
property to DataflowBlockOptions.Unbounded
, since there's no reason to block).
If it's not thread-safe, and you need to guarantee it, you need to resort to traditional synchronization primitives, like the lock
statement.
In this case, you'd do it like so (although it's clearly not needed, as the WriteLine
method on the Console
class is thread-safe):
// The lock. var l = new object(); // The action, can be a method, makes it easier to share. Action<string> a = i => { // Ensure one call at a time. lock (l) Console.WriteLine(i); }; // And so on...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With