I have some code which processes a number of response
objects from my database in parallel (using AsParallel()
). Each response
has many components
. The responses
may share the same components. I do some modifications to the component
data and save it to the db, so I need to prevent multiple threads working on the same component
object at the same time.
I use locks to achieve this. I have a ConcurrentDictionary<int, object>
to hold all the necessary lock objects. Like this:
private static ConcurrentDictionary<int, object> compLocks = new ConcurrentDictionary<int, object>();
var compIds = db.components.Select(c => c.component_id).ToList();
foreach (var compId in compIds)
{
compLocks[compId] = new object();
}
Then later on I do this:
responses.AsParallel().ForAll(r =>
{
... do some time consuming stuff with web services ...
// this is a *just in case* addition,
// in case a new component was added to
// the db since the dictionary was constructed
// NOTE: it did not have any effect, and I'm no longer
// using it as @Henk pointed out it is not thread-safe.
//if (compLocks[c.component_id] == null)
//{
// compLocks[c.component_id] = new object();
//}
componentList.AsParallel().ForAll(c =>
{
lock (compLocks[c.component_id])
{
... do some processing, save the db records ...
}
});
});
This seems to run perfectly fine but towards the end of program execution (it runs for several hours as there are lots of data) I get the following exception:
Unhandled Exception: System.AggregateException: One or more errors occurred. ---> System.Collections.Generic.KeyNotFoundException: The given key was not present in the dictionary. at System.Collections.Concurrent.ConcurrentDictionary`2.get_Item(TKey key)
I am sure that the ConcurrentDictionary
is being populated with every possible component
ID.
I have 3 questions:
ConcurrentDictionary
for this?To make it clear what the cause of all this was, it's that .AsParallel()
doesn't enumerate the collection of responses
. It's lazy-evaluated, meaning new responses
(and therefore new components
) can be added to the collection during run-time (from other processes). Enforcing a snap-shot with .ToList()
before the .AsParallel()
fixed the problem.
My code for adding component
IDs to compLocks
during run-time didn't remedy this problem is because it is not thread safe.
1) How is this exception even possible?
Apparently it is, but not from the posted code alone. It would happen if data is added to the db (would it be an option to capture responses
with a ToList()
beforehand?)
2) Do I need a ConcurrentDictionary for this?
Not with a fixed list, but when the solution involves add-when-missing then yes, you need a Concurrent collection.
3) Is my understanding of how locking works correct in this instance / is there a better way of doing this?
Not totally sure. The locking looks OK but you will still do the processing of duplicates multiple times. Just not at the same time.
Reaction to the edit:
if (compLocks[c.component_id] == null)
{
compLocks[c.component_id] = new object();
}
this is not thread-safe. It is now possible that multiple lock objects are created for 1 component_id value. You need to use one of the GetOrAdd()
methods.
But I would not expect this to give the exception you're getting, so it's probably not the direct problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With