Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Avoiding stale (logically corrupt) data when using "ConcurrentDictionary.GetOrAdd()", Repro code included

The bottom of this article describes how using GetOrAdd may cause (if I understand it correctly) corrupt/unexpected results.

snip/

ConcurrentDictionary is designed for multithreaded scenarios. You do not have to use locks in your code to add or remove items from the collection. However, it is always possible for one thread to retrieve a value, and another thread to immediately update the collection by giving the same key a new value.

Also, although all methods of ConcurrentDictionary are thread-safe, not all methods are atomic, specifically GetOrAdd and AddOrUpdate. The user delegate that is passed to these methods is invoked outside of the dictionary's internal lock. (This is done to prevent unknown code from blocking all threads.) Therefore it is possible for this sequence of events to occur:

1) threadA calls GetOrAdd, finds no item and creates a new item to Add by invoking the valueFactory delegate.

2) threadB calls GetOrAdd concurrently, its valueFactory delegate is invoked and it arrives at the internal lock before threadA, and so its new key-value pair is added to the dictionary.

3) threadA's user delegate completes, and the thread arrives at the lock, but now sees that the item exists already

4) threadA performs a "Get", and returns the data that was previously added by threadB.

Therefore, it is not guaranteed that the data that is returned by GetOrAdd is the same data that was created by the thread's valueFactory. A similar sequence of events can occur when AddOrUpdate is called.

Question

What is the correct way to verify the data, and retry the update? A nice approach would be an extension method to try/retry this operation based on the contents of old value.

How would this be implemented? Can I rely on the result (verify) as a valid-end-state, or must I retry and re-retrieve the values using a different method?

Code

The following code has a race condition when updating the values. The desired behavior is that AddOrUpdateWithoutRetrieving() will increment various values in different ways (using ++ or Interlocked.Increment()).

I also want to perform multiple field operations in a single unit and retry the update if the previous update didn't "take" due to a race condition.

Run the code and you will see each value appear in the console start out increasing by one, but each of the values will drift and some will be a few iterations ahead/behind.

namespace DictionaryHowTo
{
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    // The type of the Value to store in the dictionary:
    class FilterConcurrentDuplicate
    {
        // Create a new concurrent dictionary.
        readonly ConcurrentDictionary<int, TestData> eventLogCache = 
             new ConcurrentDictionary<int, TestData>();

        static void Main()
        {
            FilterConcurrentDuplicate c = new FilterConcurrentDuplicate();

            c.DoRace(null);
        }

        readonly ConcurrentDictionary<int, TestData> concurrentCache = 
            new ConcurrentDictionary<int, TestData>();
        void DoRace(string[] args)
        {
            int max = 1000;

            // Add some key/value pairs from multiple threads.
            Task[] tasks = new Task[3];

            tasks[0] = Task.Factory.StartNew(() =>
            {

                System.Random RandNum = new System.Random();
                int MyRandomNumber = RandNum.Next(1, 500);

                Thread.Sleep(MyRandomNumber);
                AddOrUpdateWithoutRetrieving();

            });

            tasks[1] = Task.Factory.StartNew(() =>
            {
                System.Random RandNum = new System.Random();
                int MyRandomNumber = RandNum.Next(1, 1000);

                Thread.Sleep(MyRandomNumber);

                AddOrUpdateWithoutRetrieving();

            });

            tasks[2] = Task.Factory.StartNew(() =>
            {
                AddOrUpdateWithoutRetrieving();

            });
            // Output results so far.
            Task.WaitAll(tasks);

            AddOrUpdateWithoutRetrieving();

            Console.WriteLine("Press any key.");
            Console.ReadKey();
        }
        public class TestData : IEqualityComparer<TestData>
        {
            public string aStr1 { get; set; }
            public Guid? aGud1 { get; set; }
            public string aStr2 { get; set; }
            public int aInt1 { get; set; }
            public long? aLong1 { get; set; }

            public DateTime aDate1 { get; set; }
            public DateTime? aDate2 { get; set; }

            //public int QueryCount { get; set; }
            public int QueryCount = 0;//

            public string zData { get; set; }
            public bool Equals(TestData x, TestData y)
            {
                return x.aStr1 == y.aStr1 &&
                    x.aStr2 == y.aStr2 &&
                       x.aGud1 == y.aGud1 &&
                       x.aStr2 == y.aStr2 &&
                       x.aInt1 == y.aInt1 &&
                       x.aLong1 == y.aLong1 &&
                       x.aDate1 == y.aDate1 &&
                       x.QueryCount == y.QueryCount ;
            }

            public int GetHashCode(TestData obj)
            {
                TestData ci = (TestData)obj;
                // http://stackoverflow.com/a/263416/328397
                return 
                  new { 
                         A = ci.aStr1, 
                         Aa = ci.aStr2, 
                         B = ci.aGud1, 
                         C = ci.aStr2, 
                         D = ci.aInt1, 
                         E = ci.aLong1, 
                         F = ci.QueryCount , 
                         G = ci.aDate1}.GetHashCode();
            }
        }
        private   void AddOrUpdateWithoutRetrieving()
        {
            // Sometime later. We receive new data from some source.
            TestData ci = new TestData() 
            { 
              aStr1 = "Austin", 
              aGud1 = new Guid(), 
              aStr2 = "System", 
              aLong1 = 100, 
              aInt1 = 1000, 
              QueryCount = 0, 
              aDate1 = DateTime.MinValue
            };

            TestData verify = concurrentCache.AddOrUpdate(123, ci,
                (key, existingVal) =>
                {
                    existingVal.aStr2 = "test1" + existingVal.QueryCount;
                    existingVal.aDate1 = DateTime.MinValue;
                    Console.WriteLine
                     ("Thread:" + Thread.CurrentThread.ManagedThreadId + 
                          "  Query Count A:" + existingVal.QueryCount);
                    Interlocked.Increment(ref existingVal.QueryCount);
                    System.Random RandNum = new System.Random();
                    int MyRandomNumber = RandNum.Next(1, 1000);

                    Thread.Sleep(MyRandomNumber);
                    existingVal.aInt1++;
                    existingVal.aDate1 = 
                         existingVal.aDate1.AddSeconds
                         (existingVal.aInt1);  
                    Console.WriteLine(
                          "Thread:" + Thread.CurrentThread.ManagedThreadId + 
                           "  Query Count B:" + existingVal.QueryCount);
                    return existingVal;
                });


            // After each run, every value here should be ++ the previous value
            Console.WriteLine(
                "Thread:"+Thread.CurrentThread.ManagedThreadId + 
                 ": Query Count returned:" + verify.QueryCount + 
                 " eid:" + verify.aInt1 + " date:" +  
                 verify.aDate1.Hour + " "  + verify.aDate1.Second + 
                 " NAME:" + verify.aStr2
                );
        }

    }
}

Output

Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System

Thread:12  Query Count A:0
Thread:13  Query Count A:1
Thread:12  Query Count B:2
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11

Thread:12  Query Count A:2
Thread:13  Query Count B:3
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12

Thread:13  Query Count A:3
Thread:11  Query Count A:4
Thread:11  Query Count B:5
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14

Thread:11  Query Count A:5
Thread:13  Query Count B:6
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15

....

Thread:11  Query Count A:658
Thread:11  Query Count B:659
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658

Thread:11  Query Count A:659
Thread:11  Query Count B:660
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659

Thread:11  Query Count A:660
Thread:11  Query Count B:661
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660

Thread:11  Query Count A:661
Thread:11  Query Count B:662
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661

In this code "eid" should always be 1,000 more than Query count, but over the iterations the difference varies from 1 to 7 between the two. That inconsistency may cause some applications to fail or report incorrect data.

like image 650
makerofthings7 Avatar asked May 19 '12 15:05

makerofthings7


People also ask

Is ConcurrentDictionary GetOrAdd thread-safe?

Also, although all methods of ConcurrentDictionary<TKey,TValue> are thread-safe, not all methods are atomic, specifically GetOrAdd and AddOrUpdate. To prevent unknown code from blocking all threads, the user delegate that's passed to these methods is invoked outside of the dictionary's internal lock.

How does ConcurrentDictionary work in C#?

ConcurrentDictionary is thread-safe collection class to store key/value pairs. It internally uses locking to provide you a thread-safe class. It provides different methods as compared to Dictionary class. We can use TryAdd, TryUpdate, TryRemove, and TryGetValue to do CRUD operations on ConcurrentDictionary.

Is ConcurrentDictionary slow?

ConcurrentDictionary - "Good read speed even in the face of concurrency, but it's a heavyweight object to create and slower to update."

Is ConcurrentDictionary ordered?

No. The list order of ConcurrentDictionary is NOT guaranteed, lines can come out in any order.


1 Answers

This submission is based on an incorrect understanding of the remarks at the bottom of the article “How to: Add and Remove Items from a ConcurrentDictionary” http://msdn.microsoft.com/en-us/library/dd997369.aspx and on a basic concurrency bug – concurrent non-atomic modification of a shared object.

First, let’s clarify what the linked article really says. I’ll use AddOrUpdate as example, but the reasoning for GetOrAdd is equivalent.

Say, you call AddOrUpdate from several threads and specify the same key. Assume that an entry with that key already exists. Each thread will come along, notice that there is already an entry with the specified key and that the Update part of AddOrUpdate is relevant. In doing so, no thread will lock the dictionary. Instead it will use some interlocked instructions to atomically check if an entry-key exists or not.

So, our several threads all noticed that the key exists and that the updateValueFactory needs to be called. That delegate is passed to AddOrUpdate; it takes references the existing key and value and returns the update-value. Now, all the threads involved will call the factory concurrently. The they will all complete in some previously unknown order and every thread will try using an atomic operation (using interlocked instructions) to replace the existing value with the value it just computed. There is no way to know which thread will “win”. The thread that wins will get to store its computed value. Others will notice that the value in the dictionary is no longer the value that was passed into their updateValueFactory as an argument. In response to that realisation, they will abandon the operation and throw away the just computed value. This is exactly what you want to happen.

Next, lets clarify why you get strange values when running the code sample listed here:

Recall that the updateValueFactory delegate passed to AddOrUpdate takes REFERENCES the existing key and value and returns the update-value. The code sample in its AddOrUpdateWithoutRetrieving() method starts performing operations directly o that reference. Instead of creating a new replacement value and modifying THAT, it modifies instance member values of existingVal – an object that is already in the dictionary – and then simply returns that reference. And it does that not atomically – it reads some values, updates some values, reads more, updates more. Of course, we have seen above that this happens on several threads concurrently – they all modify the SAME object. No wonder the result is that at any one time (when the code sample calls WriteLine), the object contains member instance values that originated from different threads.

The dictionary has nothing to do with this – the code simply modifies an object that is shared between threads non-atomically. This is one of the most common concurrency bugs around. The two most common workarounds depend on the scenario. Either use a shared lock to make the entire object modification atomic, or first atomically copy the entire object and then modify the local copy.

For the latter, try adding this to the TestData class:

private Object _copyLock = null;

private Object GetLock() {

    if (_copyLock != null)
        return _copyLock;

    Object newLock = new Object();
    Object prevLock = Interlocked.CompareExchange(ref _copyLock, newLock, null);
    return (prevLock == null) ? newLock : prevLock;
}

public TestData Copy() {

    lock (GetLock()) {
        TestData copy = new TestData();
        copy.aStr1 = this.aStr1;
        copy.aStr2 = this.aStr2;
        copy.aLong1 = this.aLong1;
        copy.aInt1 = this.aInt1;
        copy.QueryCount = this.QueryCount;
        copy.aDate1 = this.aDate1;
        copy.aDate2 = this.aDate2;
        copy.zData = this.zData;

        return copy;
    }
}

Then modify the factory as follows:

TestData verify = concurrentCache.AddOrUpdate(123, ci,
    (key, existingVal) =>
    {
        TestData newVal = existingVal.Copy();
        newVal.aStr2 = "test1" + newVal.QueryCount;
        newVal.aDate1 = DateTime.MinValue;
        Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + "  Query Count A:" + newVal.QueryCount);
        Interlocked.Increment(ref newVal.QueryCount);
        System.Random RandNum = new System.Random();
        int MyRandomNumber = RandNum.Next(1, 1000);

        Thread.Sleep(MyRandomNumber);
        newVal.aInt1++;
        newVal.aDate1 = newVal.aDate1.AddSeconds(newVal.aInt1);
        Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + "  Query Count B:" + newVal.QueryCount);
        return newVal;
    });

I hope this helps.

like image 73
Gragus Avatar answered Oct 05 '22 13:10

Gragus