Is there a way to batch a collection of items from the blocking collection. E.G.
I have a messaging bus publisher calling blockingCollection.Add()
And a consuming thread which is created like this:
Task.Factory.StartNew(() =>
{
foreach (string value in blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine(value);
}
});
However, I only want the Console to write after the blocking collection has 10 items on it, whereas GetConsumingEnumerable() always fires after each item is added. I could write my own queue for this but I'd like to use the blocking collection if possible?
Not sure what the project requirements are but I'd recommend TPL DataFlow BatchBlock.
You would instantiate a BatchBlock<string>
, bind it to an ActionBlock<string>
and then post to the batch block.
A pseudo code might look something like this:
var bb = new BatchBlock<string>(10);
var ab = new ActionBlock<string[]>(msgArray=>{
foreach(var msg in msgArray)
Console.Writeline(msg);
});
bb.LinkTo(ab);
foreach (string value in blockingCollection.GetConsumingEnumerable())
{
bb.Post(value);
}
Using DataFlow you might even want to replace the BlockingCollection with a BufferBlock or just post to the buffer block directly without first adding to the blocking collection, since batch block is already thread-safe.
A quick solution would be something like this
public class ConsoleQueue
{
private readonly List<string> _values = new List<string>();
public void FlushQueueIfFull()
{
if (_values.Count < 10) return;
foreach (var value in _values)
{
Console.WriteLine(value);
}
_values.Clear();
}
public void Push(string message)
{
_values.Add(message);
FlushQueueIfFull();
}
}
then you can use it like this
var queue = new ConsoleQueue();
Task.Factory.StartNew(() =>
{
foreach (string value in blockingCollection.GetConsumingEnumerable())
{
queue.Push(value);
}
});
You can easily extend it to cover thread safety etc
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With