I have a BlockingCollection(ConcurrentBag, 50000) where I am trying to use a very small Bounded Capacity of 50,000 for the producer threads in order to maximize the number of records I can process in my consumer thread's ConcurrentDictionary. The producer is much, much faster than the consumer and will consume most of the memory otherwise.
Unfortunately, I immediately noticed that the total number of records in my ConcurrentDictionary is now substantially lower than it should be after adding the bounded capacity of 50,000 when my test data executes. I read that the BlockingCollection's .add method should block indefinitely until there is space in the collection for the add to execute. However, this does not appear to be the case.
Questions:
Will a BlockingCollection's .add method eventually time out or silently fail if too many add's are called before capacity in the BlockingCollection frees up?
If the answer to #1 is yes, how many adds can I attempt after the Bounding Capacity has been exceeded without losing data?
If many BlockingCollection .add() methods are called which are waiting / blocking for capacity and the CompleteAdding() method is called, will those waiting / blocking adds continue to wait and then eventually add or do they silently fail?
Make sure that, if you are using a BlockingCollection together with a ConcurrentDictionary that you do not have a BlockingCollection.TryAdd(myobject) method hidden in your code somewhere and are mistaking it for the ConcurrentDictionary.TryAdd() method. The BlockingCollection.TryAdd(myobject) will return false and discard the add request producing a "silent fail", if the BlockingCollection's Bounding Capacity has been exceeded.
A Final Note Regarding Performance
It appears that (in my own case anyways) using a Bounding Capacity and .Add() on a BlockingCollection is very slow compared to using no Bounding Capacity and .TryAdd() in the same process.
I achieved much better performance results by implementing my own Bounding Capacity strategy. There are many ways to do this. Three choices include Thread.Sleep(), Thread.Spinwait(), or Monitor.Wait() used together with Monitor.PulseAll(). When one of these strategies are used, it is possible to also use BlockingCollection.TryAdd() instead of BlockingCollection.Add() and have NO Bounding Capacity without losing any data or running out of memory. This method also seems to yield better performance.
You can choose from the three examples based on which scenario works best for the speed differences in your Producer and Consumer threads.
Thread.Wait() Example:
//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{ //If the bounded capacity has been exceeded
//place the thread in wait mode
Thread.Sleep(SleepTime);
}
Thread.SpinWait() Example:
//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{ //If the capacity has been exceeded
//place the thread in wait mode
Thread.SpinWait(SpinCount);
}
Monitor.Wait() Example
This example requires a hook in both the Producer and Consumer sides.
Producer Code
//Check to see BlockingCollection capacity has been exceeded.
if (Tokens.Count > 50000)
{
lock (syncLock)
{ //Double check before waiting
if (Tokens.Count > 50000)
{
Monitor.Wait(syncLock, 1000);
}
}
}
Consumer Code
//Check to see BlockingCollection capacity is back a normal range.
if (Tokens.Count <= 40000)
{
lock (syncLock)
{ //Double check before waiting
if (Tokens.Count < 40000)
{
Monitor.PulseAll(syncLock);
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With