Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow async scheduling

The scheduling of async Tasks does not work as I expected in TPL Dataflow. In the example below, I expected the ActionBlock to process data from the TransformBlock as soon as it is available. But it is waiting on the second (delayed) result before it proceeds to the third. What have I misunderstood here? Is there some requirement on the order of processing?

public class TestDataFlow
{
    public System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();

    public async Task Flow()
    {
        watch.Start();

        var plus10 = new TransformBlock<int, int>(async input =>
        {
            if (input == 2)
            {
                await Task.Delay(5000);
            }
            Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
            return input + 10;
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,
        });

        var printSolution = new ActionBlock<int>(input =>
        {
            Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,
        });

        plus10.LinkTo(printSolution);

        List<int> inputs = new List<int> { 1, 2, 3 };
        foreach (var input in inputs)
        {
            await plus10.SendAsync(input);
        }
    }
}

Output:

Exiting plus10 for input 1 @ 115.8583
Exiting plus10 for input 3 @ 116.6973
Solution: 11 @ 126.0146
Exiting plus10 for input 2 @ 5124.4074
Solution: 12 @ 5124.9014
Solution: 13 @ 5126.4834
like image 776
Petter T Avatar asked Nov 27 '14 12:11

Petter T


2 Answers

TPL Dataflow guarantees order of the input and output queues no matter how many items are processed in parallel.

"Because each predefined source dataflow block type guarantees that messages are propagated out in the order in which they are received, every message must be read from the source block before the source block can process the next message"

From Dataflow (Task Parallel Library)

If you want items to move on to the next block exactly when they are finished processing you should transfer them explicitly yourself which turns your TransformBlock to an ActionBlock:

var printSolution = new ActionBlock<int>(input =>
{
    Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
},executionDataflowBlockOptions);

var plus10 = new ActionBlock<int>(async input =>
{
    if (input == 2)
    {
        await Task.Delay(5000);
    }
    Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
    await printSolution.SendAsync(input + 10);
}, executionDataflowBlockOptions);
like image 94
i3arnon Avatar answered Sep 20 '22 17:09

i3arnon


As of (at least) System.Threading.Tasks.Dataflow.4.6.0, ExecutionDataflowBlockOptions now has a property EnsureOrdered which may be set to false.

To update:

Install-Package System.Threading.Tasks.Dataflow

Code:

var options = new ExecutionDataflowBlockOptions {
  EnsureOrdered = false
};
var transform = new TransformBlock<int, int>(i => Transform(i), options);

Some more examples: https://stackoverflow.com/a/38865414/625919

Development history, which I thought was neat: https://github.com/dotnet/corefx/issues/536 https://github.com/dotnet/corefx/pull/5191

like image 31
DharmaTurtle Avatar answered Sep 20 '22 17:09

DharmaTurtle