I am using a .NET 4.0 BlockingCollection to handle a queue of items that each need to be processed by an operation that can take up to a second to process each item. This queue of items can be added to by different threads.
I have a couple of questions regarding this a) allowing multiple consumers to work on this BlockingCollection? I noticed GetConsumingEnumerable(), which seems to be applicable for single consumer scenarios. The reason for having multiple consumers is that the processing, via a named pipe instance, can process up to three of these items at a time, so I thought I could have three consumers.
b) Is there a way of checking to see if an item is on this queue, and if so, getting the caller that checks to see if there is an item to block until the item has been processed?
EDIT:
Based on Jon Skeet's answer here's some sample code to illustrate multiple consumers acting on a BlockingCollection populated by a single producer, with consumers using GetConsumingEnumerable()
:
static BlockingCollection<string> coll = new BlockingCollection<string>();
static void Consume()
{
foreach (var i in coll.GetConsumingEnumerable())
{
Console.WriteLine(String.Format("Thread {0} Consuming: {1}", Thread.CurrentThread.ManagedThreadId, i));
Thread.Sleep(1000);
}
}
static void Main(string[] args)
{
int item = 0;
Task.Factory.StartNew(() =>
{
while (true)
{
coll.Add(string.Format("Item {0}", item++));
Thread.Sleep(500);
}
});
for (int i = 0; i < 2; i++)
{
Task.Factory.StartNew(() => Consume());
}
while (true) ;
}
The items are processed in an interleaved manner between the two consumers operating on the two different threads, e.g.
Thread 4 Consuming: Item 0
Thread 5 Consuming: Item 1
Thread 4 Consuming: Item 2
Thread 5 Consuming: Item 3
Thread 4 Consuming: Item 4
Multiple consumers can just call Take
or TryTake
concurrently - each item will only be consumed by a single consumer.
However, I believe GetConsumingEnumerable
will also do what you want. I believe if each caller calls that, each will get a separate consuming enumerable, which again will make sure that each item is only consumed once. I'm not sure offhand what happens when the queue becomes empty - I don't know whether MoveNext()
then blocks, or returns false.
I didn't really follow your second question though...
GetConsumingEnumerable
is in fact safe to call from multiple consumers simultaneously; the enumerable only completes when the collection is marked as complete. Each item is only consumed once.
GetConsumingEnumerable
is essentially equivalent to:
while (!IsCompleted)
{
if (TryTake(out var item, Timeout.Infinite))
yield return item;
}
plus a bit of cancellation/cleanup logic.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With