Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ConcurrentBag<T> getting duplicates (seems not to be thread safe)

I must be doing something wrong somewhere because i am getting duplicate items in my concurrentbag, here is the chain of events

  var listings = new ConcurrentBag<JSonListing>();
  Parallel.ForEach(Types, ParallelOptions, (type, state) =>
  {
      ...
      status = ProcessType(listings, status, type, state);
      ....
   });

  private GeocodeResults ProcessType(ConcurrentBag<JSonListing> listings, GeocodeResults status, XElement type, ParallelLoopState state)
  {
      ....
      AddListingsToList(results, type, listings);
      .... 
  }

private void AddListingsToList(dynamic results, XElement type, ConcurrentBag<JSonListing> listings)
    {

        var typeMustMatch = type.Attribute("TypeMustMatch").Value;
        var typeID = Convert.ToInt32(type.Attribute("ID").Value);

        foreach (var result in results.results)
        {


            var foundListing = listings.SingleOrDefault(x => x.ID == result.id);
            if (foundListing != null)
            {
                var typeIDs = foundListing.TypeIDs.Split("|".ToCharArray(), StringSplitOptions.RemoveEmptyEntries).ToList();
                if (!typeIDs.Contains(typeID.ToString()))
                {
                    foundListing.TypeIDs += "|" + typeID;
                }
            }
            else
            {
                var listing = new JSonListing { ID = result.id, ReferenceNumber = result.reference, TypeIDs = typeID.ToString(), TypeMustMatch = typeMustMatch };
                listings.Add(listing);
            }




        }


    }

The add listing should guarantee that if the element alread exists, not to add another one of the same ID but instead update some property. Now the error i get is

System.InvalidOperationException: Sequence contains more than one matching element
at System.Linq.Enumerable.SingleOrDefault[TSource](IEnumerable`1 source, Func`2 predicate)
at LocalSearch.Processor.CityProcessor.AddListingsToList(Object results, XElement type, ConcurrentBag`1 listings) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 310
at CallSite.Target(Closure , CallSite , CityProcessor , Object , XElement , ConcurrentBag`1 )
at LocalSearch.Processor.CityProcessor.ProcessType(ConcurrentBag`1 listings, GeocodeResults status, XElement type, ParallelLoopState state) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 249
at LocalSearch.Processor.CityProcessor.<>c__DisplayClass4.b__0(XElement type, ParallelLoopState state) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 137

like image 463
Zoinky Avatar asked Feb 07 '14 12:02

Zoinky


1 Answers

ConcurrentBag guarantees that each operation on it is thread-safe when considered on its own. It does not guarantee that multiple operations in succession will be treated as an atomic group.

As a result your code has a race condition: you check if the bag already contains some item X, but two threads can run the test concurrently, decide that the item isn't there, and proceed to add it. End result: two copies of the item end up being in the bag.

It looks like your use case would be better implemented by using a ConcurrentDictionary instead and leveraging the TryAdd method, which is atomic. Alternatively you could put a lock() around the bag to make everything inside the block operate atomically, but then you don't really need a concurrent collection and can use a straight List instead.

like image 51
Jon Avatar answered Oct 06 '22 22:10

Jon