Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

BlockingCollection<T> batching using TPL DataFlow [duplicate]

Is there a way to batch a collection of items from the blocking collection. E.G.

I have a messaging bus publisher calling blockingCollection.Add()

And a consuming thread which is created like this:

Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
                {
                    Console.WriteLine(value);
                }
        });

However, I only want the Console to write after the blocking collection has 10 items on it, whereas GetConsumingEnumerable() always fires after each item is added. I could write my own queue for this but I'd like to use the blocking collection if possible?

like image 671
The Unculled Badger Avatar asked Feb 11 '14 14:02

The Unculled Badger


2 Answers

Not sure what the project requirements are but I'd recommend TPL DataFlow BatchBlock.

You would instantiate a BatchBlock<string>, bind it to an ActionBlock<string> and then post to the batch block.

A pseudo code might look something like this:

var bb = new BatchBlock<string>(10);
var ab = new ActionBlock<string[]>(msgArray=>{ 
    foreach(var msg in msgArray) 
        Console.Writeline(msg);
});

bb.LinkTo(ab);

foreach (string value in blockingCollection.GetConsumingEnumerable())
{
      bb.Post(value);
}

Using DataFlow you might even want to replace the BlockingCollection with a BufferBlock or just post to the buffer block directly without first adding to the blocking collection, since batch block is already thread-safe.

like image 160
Dimitri Avatar answered Nov 14 '22 21:11

Dimitri


A quick solution would be something like this

public class ConsoleQueue
{
    private readonly List<string> _values = new List<string>();

    public void FlushQueueIfFull()
    {
        if (_values.Count < 10) return;
        foreach (var value in _values)
        {
            Console.WriteLine(value);
        }
        _values.Clear();
    }

    public void Push(string message)
    {
        _values.Add(message);
        FlushQueueIfFull();
    }
}

then you can use it like this

        var queue = new ConsoleQueue();

        Task.Factory.StartNew(() =>
        {
            foreach (string value in blockingCollection.GetConsumingEnumerable())
            {
                queue.Push(value);
            }
        });

You can easily extend it to cover thread safety etc

like image 29
DrinkBird Avatar answered Nov 14 '22 22:11

DrinkBird