Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple consumers and querying a C# BlockingCollection

Tags:

c#

.net

c#-4.0

I am using a .NET 4.0 BlockingCollection to handle a queue of items that each need to be processed by an operation that can take up to a second to process each item. This queue of items can be added to by different threads.

I have a couple of questions regarding this a) allowing multiple consumers to work on this BlockingCollection? I noticed GetConsumingEnumerable(), which seems to be applicable for single consumer scenarios. The reason for having multiple consumers is that the processing, via a named pipe instance, can process up to three of these items at a time, so I thought I could have three consumers.

b) Is there a way of checking to see if an item is on this queue, and if so, getting the caller that checks to see if there is an item to block until the item has been processed?

EDIT:

Based on Jon Skeet's answer here's some sample code to illustrate multiple consumers acting on a BlockingCollection populated by a single producer, with consumers using GetConsumingEnumerable():

static BlockingCollection<string> coll = new BlockingCollection<string>();

static void Consume()
{
    foreach (var i in coll.GetConsumingEnumerable())
    {
        Console.WriteLine(String.Format("Thread {0} Consuming: {1}",  Thread.CurrentThread.ManagedThreadId, i));
        Thread.Sleep(1000);
    }
}

static void Main(string[] args)
{
    int item = 0;

    Task.Factory.StartNew(() =>
    {
        while (true)
        {
            coll.Add(string.Format("Item {0}", item++));
            Thread.Sleep(500);
        }
    });

    for (int i = 0; i < 2; i++)
    {
        Task.Factory.StartNew(() => Consume());
    }

    while (true) ;
}

The items are processed in an interleaved manner between the two consumers operating on the two different threads, e.g.

Thread 4 Consuming: Item 0
Thread 5 Consuming: Item 1
Thread 4 Consuming: Item 2
Thread 5 Consuming: Item 3
Thread 4 Consuming: Item 4
like image 494
pkiddie Avatar asked Sep 23 '11 11:09

pkiddie


2 Answers

Multiple consumers can just call Take or TryTake concurrently - each item will only be consumed by a single consumer.

However, I believe GetConsumingEnumerable will also do what you want. I believe if each caller calls that, each will get a separate consuming enumerable, which again will make sure that each item is only consumed once. I'm not sure offhand what happens when the queue becomes empty - I don't know whether MoveNext() then blocks, or returns false.

I didn't really follow your second question though...

like image 91
Jon Skeet Avatar answered Sep 28 '22 23:09

Jon Skeet


GetConsumingEnumerable is in fact safe to call from multiple consumers simultaneously; the enumerable only completes when the collection is marked as complete. Each item is only consumed once.

GetConsumingEnumerable is essentially equivalent to:

while (!IsCompleted)
{
  if (TryTake(out var item, Timeout.Infinite))
    yield return item;
}

plus a bit of cancellation/cleanup logic.

like image 26
Stephen Cleary Avatar answered Sep 28 '22 23:09

Stephen Cleary