Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

"bounded" BatchBlock => ActionBlock. How to complete the proper way?

I'm trying to use a bounded batchblock linked to an action block. I know when the feeding of items in the batchblock end and I want to trigger a completion chain.

The problem is: if my BatchBlock<T> is of a given BoundedCapacity I won't get all my items fired in the action block.

Here is a sample of my problem, it should (well in my understanding of TPL dataflow...) print 0 to 124 but it ends up printing 0 to 99.

There must be something I'm missing... Maybe BoundedCapacity means "drop items when queue count is over xxx..." if so how can I achieve a guaranteed maximum memory consumption?

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            int itemsCount = 125;
            List<int> ints = new List<int>(itemsCount);
            for (int i = 0; i < itemsCount; i++)
                ints.Add(i);

            BatchBlock<int> batchBlock = new BatchBlock<int>(50,new GroupingDataflowBlockOptions(){BoundedCapacity = 100});
            ActionBlock<int[]> actionBlock = new ActionBlock<int[]>(intsBatch =>
            {
                Thread.Sleep(1000);
                foreach (int i in intsBatch)
                    Console.WriteLine(i);               
            });
            batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });

            // feed the batch block
            foreach (int i in ints)
                batchBlock.Post(i);
            // Don't know how to end the proper way... Meaning it should display 0 to 124 and not 0 to 99
            batchBlock.Complete();
            batchBlock.TriggerBatch();
            actionBlock.Completion.Wait();
        }
    }
}
like image 914
Jeremy D Avatar asked Jul 16 '15 09:07

Jeremy D


1 Answers

Post on a block doesn't always succeed. It tries to post a message to the block but if the BoundedCapacity was reached it will fail and return false.

What you can do is use SendAsync instead which returns an awaitable task. If the block has room for your message it completes asynchronously. If it doesn't then the block returns a task that will complete when it does have room to accept a new message. You can await that task and throttle your insertions:

async Task MainAsync()
{
    var ints = Enumerable.Range(0, 125).ToList();
    var batchBlock = new BatchBlock<int>(50, new GroupingDataflowBlockOptions { BoundedCapacity = 100 });
    var actionBlock = new ActionBlock<int[]>(intsBatch =>
    {
        Thread.Sleep(1000);
        foreach (var i in intsBatch)
            Console.WriteLine(i);
    });
    batchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });

    foreach (var i in ints)
        await batchBlock.SendAsync(i); // wait synchronously for the block to accept.

    batchBlock.Complete();
    await actionBlock.Completion;
}
like image 199
i3arnon Avatar answered Sep 29 '22 23:09

i3arnon