I'm trying to use a bounded batchblock linked to an action block. I know when the feeding of items in the batchblock end and I want to trigger a completion chain.
The problem is: if my BatchBlock<T>
is of a given BoundedCapacity
I won't get all my items fired in the action block.
Here is a sample of my problem, it should (well in my understanding of TPL dataflow...) print 0 to 124 but it ends up printing 0 to 99.
There must be something I'm missing... Maybe BoundedCapacity
means "drop items when queue count is over xxx..." if so how can I achieve a guaranteed maximum memory consumption?
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApplication
{
class Program
{
static void Main(string[] args)
{
int itemsCount = 125;
List<int> ints = new List<int>(itemsCount);
for (int i = 0; i < itemsCount; i++)
ints.Add(i);
BatchBlock<int> batchBlock = new BatchBlock<int>(50,new GroupingDataflowBlockOptions(){BoundedCapacity = 100});
ActionBlock<int[]> actionBlock = new ActionBlock<int[]>(intsBatch =>
{
Thread.Sleep(1000);
foreach (int i in intsBatch)
Console.WriteLine(i);
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });
// feed the batch block
foreach (int i in ints)
batchBlock.Post(i);
// Don't know how to end the proper way... Meaning it should display 0 to 124 and not 0 to 99
batchBlock.Complete();
batchBlock.TriggerBatch();
actionBlock.Completion.Wait();
}
}
}
Post
on a block doesn't always succeed. It tries to post a message to the block but if the BoundedCapacity
was reached it will fail and return false
.
What you can do is use SendAsync
instead which returns an awaitable task. If the block has room for your message it completes asynchronously. If it doesn't then the block returns a task that will complete when it does have room to accept a new message. You can await that task and throttle your insertions:
async Task MainAsync()
{
var ints = Enumerable.Range(0, 125).ToList();
var batchBlock = new BatchBlock<int>(50, new GroupingDataflowBlockOptions { BoundedCapacity = 100 });
var actionBlock = new ActionBlock<int[]>(intsBatch =>
{
Thread.Sleep(1000);
foreach (var i in intsBatch)
Console.WriteLine(i);
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var i in ints)
await batchBlock.SendAsync(i); // wait synchronously for the block to accept.
batchBlock.Complete();
await actionBlock.Completion;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With