I must be doing something wrong somewhere because i am getting duplicate items in my concurrentbag, here is the chain of events
var listings = new ConcurrentBag<JSonListing>();
Parallel.ForEach(Types, ParallelOptions, (type, state) =>
{
...
status = ProcessType(listings, status, type, state);
....
});
private GeocodeResults ProcessType(ConcurrentBag<JSonListing> listings, GeocodeResults status, XElement type, ParallelLoopState state)
{
....
AddListingsToList(results, type, listings);
....
}
private void AddListingsToList(dynamic results, XElement type, ConcurrentBag<JSonListing> listings)
{
var typeMustMatch = type.Attribute("TypeMustMatch").Value;
var typeID = Convert.ToInt32(type.Attribute("ID").Value);
foreach (var result in results.results)
{
var foundListing = listings.SingleOrDefault(x => x.ID == result.id);
if (foundListing != null)
{
var typeIDs = foundListing.TypeIDs.Split("|".ToCharArray(), StringSplitOptions.RemoveEmptyEntries).ToList();
if (!typeIDs.Contains(typeID.ToString()))
{
foundListing.TypeIDs += "|" + typeID;
}
}
else
{
var listing = new JSonListing { ID = result.id, ReferenceNumber = result.reference, TypeIDs = typeID.ToString(), TypeMustMatch = typeMustMatch };
listings.Add(listing);
}
}
}
The add listing should guarantee that if the element alread exists, not to add another one of the same ID but instead update some property. Now the error i get is
System.InvalidOperationException: Sequence contains more than one matching element
at System.Linq.Enumerable.SingleOrDefault[TSource](IEnumerable`1 source, Func`2 predicate)
at LocalSearch.Processor.CityProcessor.AddListingsToList(Object results, XElement type, ConcurrentBag`1 listings) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 310
at CallSite.Target(Closure , CallSite , CityProcessor , Object , XElement , ConcurrentBag`1 )
at LocalSearch.Processor.CityProcessor.ProcessType(ConcurrentBag`1 listings, GeocodeResults status, XElement type, ParallelLoopState state) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 249
at LocalSearch.Processor.CityProcessor.<>c__DisplayClass4.b__0(XElement type, ParallelLoopState state) in d:\Projects\ListingLocator2\Code\LocalSearch.Processor\Processors.cs:line 137
ConcurrentBag
guarantees that each operation on it is thread-safe when considered on its own. It does not guarantee that multiple operations in succession will be treated as an atomic group.
As a result your code has a race condition: you check if the bag already contains some item X, but two threads can run the test concurrently, decide that the item isn't there, and proceed to add it. End result: two copies of the item end up being in the bag.
It looks like your use case would be better implemented by using a ConcurrentDictionary
instead and leveraging the TryAdd
method, which is atomic. Alternatively you could put a lock()
around the bag to make everything inside the block operate atomically, but then you don't really need a concurrent collection and can use a straight List
instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With