The scheduling of async
Task
s does not work as I expected in TPL Dataflow. In the example below, I expected the ActionBlock
to process data from the TransformBlock
as soon as it is available. But it is waiting on the second (delayed) result before it proceeds to the third. What have I misunderstood here? Is there some requirement on the order of processing?
public class TestDataFlow
{
public System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();
public async Task Flow()
{
watch.Start();
var plus10 = new TransformBlock<int, int>(async input =>
{
if (input == 2)
{
await Task.Delay(5000);
}
Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
return input + 10;
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
});
var printSolution = new ActionBlock<int>(input =>
{
Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
});
plus10.LinkTo(printSolution);
List<int> inputs = new List<int> { 1, 2, 3 };
foreach (var input in inputs)
{
await plus10.SendAsync(input);
}
}
}
Output:
Exiting plus10 for input 1 @ 115.8583
Exiting plus10 for input 3 @ 116.6973
Solution: 11 @ 126.0146
Exiting plus10 for input 2 @ 5124.4074
Solution: 12 @ 5124.9014
Solution: 13 @ 5126.4834
TPL Dataflow guarantees order of the input and output queues no matter how many items are processed in parallel.
"Because each predefined source dataflow block type guarantees that messages are propagated out in the order in which they are received, every message must be read from the source block before the source block can process the next message"
From Dataflow (Task Parallel Library)
If you want items to move on to the next block exactly when they are finished processing you should transfer them explicitly yourself which turns your TransformBlock
to an ActionBlock
:
var printSolution = new ActionBlock<int>(input =>
{
Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
},executionDataflowBlockOptions);
var plus10 = new ActionBlock<int>(async input =>
{
if (input == 2)
{
await Task.Delay(5000);
}
Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
await printSolution.SendAsync(input + 10);
}, executionDataflowBlockOptions);
As of (at least) System.Threading.Tasks.Dataflow.4.6.0
, ExecutionDataflowBlockOptions
now has a property EnsureOrdered
which may be set to false
.
To update:
Install-Package System.Threading.Tasks.Dataflow
Code:
var options = new ExecutionDataflowBlockOptions {
EnsureOrdered = false
};
var transform = new TransformBlock<int, int>(i => Transform(i), options);
Some more examples: https://stackoverflow.com/a/38865414/625919
Development history, which I thought was neat: https://github.com/dotnet/corefx/issues/536 https://github.com/dotnet/corefx/pull/5191
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With