Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow: Bounded capacity and waiting for completion

Below I have replicated a real life scenario as a LINQPad script for the sake of simplicity:

var total = 1 * 1000 * 1000;
var cts = new CancellationTokenSource();
var threads = Environment.ProcessorCount;
int capacity = 10;

var edbOptions = new ExecutionDataflowBlockOptions{BoundedCapacity = capacity, CancellationToken = cts.Token, MaxDegreeOfParallelism = threads};
var dbOptions = new DataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var gdbOptions = new GroupingDataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token};
var dlOptions = new DataflowLinkOptions {PropagateCompletion = true};

var counter1 = 0;
var counter2 = 0;

var delay1 = 10;
var delay2 = 25;

var action1 = new Func<IEnumerable<string>, Task>(async x => {await Task.Delay(delay1); Interlocked.Increment(ref counter1);});
var action2 = new Func<IEnumerable<string>, Task>(async x => {await Task.Delay(delay2); Interlocked.Increment(ref counter2);});

var actionBlock1 = new ActionBlock<IEnumerable<string>>(action1, edbOptions);
var actionBlock2 = new ActionBlock<IEnumerable<string>>(action2, edbOptions);

var batchBlock1 = new BatchBlock<string>(5, gdbOptions);
var batchBlock2 = new BatchBlock<string>(5, gdbOptions);

batchBlock1.LinkTo(actionBlock1, dlOptions);
batchBlock2.LinkTo(actionBlock2, dlOptions);

var bufferBlock1 = new BufferBlock<string>(dbOptions); 
var bufferBlock2 = new BufferBlock<string>(dbOptions); 

bufferBlock1.LinkTo(batchBlock1, dlOptions);
bufferBlock2.LinkTo(batchBlock2, dlOptions);

var bcBlock = new BroadcastBlock<string>(x => x, dbOptions);

bcBlock.LinkTo(bufferBlock1, dlOptions);
bcBlock.LinkTo(bufferBlock2, dlOptions);

var mainBlock = new TransformBlock<int, string>(x => x.ToString(), edbOptions);
mainBlock.LinkTo(bcBlock, dlOptions);

mainBlock.Dump("Main Block");
bcBlock.Dump("Broadcast Block");
bufferBlock1.Dump("Buffer Block 1");
bufferBlock2.Dump("Buffer Block 2");
actionBlock1.Dump("Action Block 1");
actionBlock2.Dump("Action Block 2");

foreach(var i in Enumerable.Range(1, total))
  await mainBlock.SendAsync(i, cts.Token);

mainBlock.Complete();

await Task.WhenAll(actionBlock1.Completion, actionBlock2.Completion);

counter1.Dump("Counter 1");
counter2.Dump("Counter 2");

I have two issues with this code:

  1. Although I limited BoundedCapacity of all appropriate blocks to 10 elements, it seems like I can push all 1,000,000 messages almost at once. Is this expected behavior?
  2. Although the entire network is configured to propagate completion, it seems like all blocks get completed almost immediately after calling mainBlock.Complete(). I expect that both counter1 and counter2 variables to be equal to total. Is there a way to achieve such behavior?
like image 891
Yuriy Magurdumov Avatar asked May 28 '14 18:05

Yuriy Magurdumov


1 Answers

Yes, this is the expected behavior, because of the BroadcastBlock:

Provides a buffer for storing at most one element at time, overwriting each message with the next as it arrives.

This means that if you link BroadcastBlock to blocks with BoundedCapacity, you will lose messages.

To fix that, you could create a custom block that behaves like BroadcastBlock, but guarantees delivery to all targets. But doing that is not trivial, so you might be satisified with a simpler variant (originally from my old answer):

public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(
    IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options)
{
    var targetsList = targets.ToList();

    var block = new ActionBlock<T>(
        async item =>
        {
            foreach (var target in targetsList)
            {
                await target.SendAsync(item);
            }
        }, new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = options.BoundedCapacity,
            CancellationToken = options.CancellationToken
        });

    block.Completion.ContinueWith(task =>
    {
        foreach (var target in targetsList)
        {
            if (task.Exception != null)
                target.Fault(task.Exception);
            else
                target.Complete();
        }
    });

    return block;
}

Usage in your case would be:

var bcBlock = CreateGuaranteedBroadcastBlock(
    new[] { bufferBlock1, bufferBlock2 }, dbOptions);
like image 150
svick Avatar answered Nov 03 '22 01:11

svick