Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unexpected Behaviour - TPL DataFlow BatchBlock Rejects items while TriggerBatch is executing

When you create a batchblock with bounded capacity and call triggerBatch while (In parallel to) posting a new item - posting new item will fail during the trigger batch execution time.

Calling Trigger batch (every X time) is made in order to ensure that the data isn't delayed for too long in the block, in cases where the incoming data stream paused or slowed down.

The following code will output some "post failure" events. For example:

    public static void Main(string[] args)
    {
        var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 });
        var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
        batchBlock.LinkTo(actionBlock);

        var producerTask = Task.Factory.StartNew(() =>
        {
            //Post 10K Items
            for (int i = 0; i < 10000; i++)
            {
                var postResult = batchBlock.Post(i);
                if (!postResult)
                    Console.WriteLine("Failed to Post");
            }
        });

        var triggerBatchTask = Task.Factory.StartNew(() =>
            {                    
                //Trigger Batch..
                for (int i = 0; i < 1000000; i++)
                    batchBlock.TriggerBatch();
            });

        producerTask.Wait();
        triggerBatchTask.Wait();
    }

    public static void ProcessBatch(int[] batch)
    {
        Console.WriteLine("{0} - {1}", batch.First(), batch.Last());
    }

*Note that this scenario is reproducible only when the batchBlock is Bounded.

Am I missing something or is it an issue with batchBlock?

like image 574
Al Yaros Avatar asked Feb 25 '16 12:02

Al Yaros


1 Answers

The BatchBlock does not really reject the item, it attempts to postpone it. Except that in the case of Post(), postponing is not an option. A simple way to fix this would be to use await batchBlock.SendAsync(i) instead of batchBlock.Post(i) (this also means you need to change Task.Factory.StartNew(() => to Task.Run(async () =>).

Why does this happen? According to the source code, if the BatchBlock is bounded, TriggerBatch() is processed asynchronously and while it's processed, no new items are being accepted.

In any case, you shouldn't expect that Post() will always return true on a bounded block, if the block is full, Post() will also return false.

like image 149
svick Avatar answered Sep 28 '22 15:09

svick