Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can Bounded BlockingCollections Lose Data During Adds

I have a BlockingCollection(ConcurrentBag, 50000) where I am trying to use a very small Bounded Capacity of 50,000 for the producer threads in order to maximize the number of records I can process in my consumer thread's ConcurrentDictionary. The producer is much, much faster than the consumer and will consume most of the memory otherwise.

Unfortunately, I immediately noticed that the total number of records in my ConcurrentDictionary is now substantially lower than it should be after adding the bounded capacity of 50,000 when my test data executes. I read that the BlockingCollection's .add method should block indefinitely until there is space in the collection for the add to execute. However, this does not appear to be the case.

Questions:

  1. Will a BlockingCollection's .add method eventually time out or silently fail if too many add's are called before capacity in the BlockingCollection frees up?

  2. If the answer to #1 is yes, how many adds can I attempt after the Bounding Capacity has been exceeded without losing data?

  3. If many BlockingCollection .add() methods are called which are waiting / blocking for capacity and the CompleteAdding() method is called, will those waiting / blocking adds continue to wait and then eventually add or do they silently fail?

like image 849
Jake Drew Avatar asked Oct 31 '12 03:10

Jake Drew


1 Answers

Make sure that, if you are using a BlockingCollection together with a ConcurrentDictionary that you do not have a BlockingCollection.TryAdd(myobject) method hidden in your code somewhere and are mistaking it for the ConcurrentDictionary.TryAdd() method. The BlockingCollection.TryAdd(myobject) will return false and discard the add request producing a "silent fail", if the BlockingCollection's Bounding Capacity has been exceeded.

  1. The BlockingCollection's .Add() method does not appear to "silenty fail" or lose add's after exceeding the Bounding Capacity by a large amount. The add() method will eventually cause the process to run out of memory, if too many .add()'s are waiting to be added to a BlockingCollection that is over capacity. (This would have to be a very extreme case of flow-control issues)
  2. See #1.
  3. My own tests seem to indicate that once the CompleteAdding() method is called, all subsequent adds fail as described in the MSDN docs.

A Final Note Regarding Performance

It appears that (in my own case anyways) using a Bounding Capacity and .Add() on a BlockingCollection is very slow compared to using no Bounding Capacity and .TryAdd() in the same process.

I achieved much better performance results by implementing my own Bounding Capacity strategy. There are many ways to do this. Three choices include Thread.Sleep(), Thread.Spinwait(), or Monitor.Wait() used together with Monitor.PulseAll(). When one of these strategies are used, it is possible to also use BlockingCollection.TryAdd() instead of BlockingCollection.Add() and have NO Bounding Capacity without losing any data or running out of memory. This method also seems to yield better performance.

You can choose from the three examples based on which scenario works best for the speed differences in your Producer and Consumer threads.

Thread.Wait() Example:

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the bounded capacity has been exceeded
    //place the thread in wait mode 
    Thread.Sleep(SleepTime);
}

Thread.SpinWait() Example:

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the capacity has been exceeded
    //place the thread in wait mode 
    Thread.SpinWait(SpinCount);
}  

Monitor.Wait() Example

This example requires a hook in both the Producer and Consumer sides.

Producer Code

//Check to see BlockingCollection capacity has been exceeded.
if (Tokens.Count > 50000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count > 50000)
        {
            Monitor.Wait(syncLock, 1000);
        }
    }
}

Consumer Code

//Check to see BlockingCollection capacity is back a normal range.
if (Tokens.Count <= 40000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count < 40000)
        {
            Monitor.PulseAll(syncLock);
        }
    }
}
like image 167
Jake Drew Avatar answered Sep 20 '22 02:09

Jake Drew