I have two BlockingCollection<T>
objects, collection1
and collection2
. I want to consume items from these collections giving priority to items in collection1
. That is, if both collections have items, I want to take items from collection1
first. If none of them have items, I want to wait for an item to be available.
I have the following code:
public static T Take<T>(
BlockingCollection<T> collection1,
BlockingCollection<T> collection2) where T:class
{
if (collection1.TryTake(out var item1))
{
return item1;
}
T item2;
try
{
BlockingCollection<T>.TakeFromAny(
new[] { collection1, collection2 },
out item2);
}
catch (ArgumentException)
{
return null;
}
return item2;
}
This code is expected to return null
when CompleteAdding
is called on both collections and they both are empty.
My main issue with this code is that the documentation for the TakeFromAny
method specifies that TakeFromAny
will throw an ArgumentException
if CompleteAdding
was called on "the collection":
ArgumentException
The collections argument is a 0-length array or contains a null element or CompleteAdding() has been called on the collection.
Does it throw if CompleteAdding
was called on any collection? or both collections?
What if CompleteAdding
was called and the collection still has some items, does it throw?
I need a reliable way to do this.
In this code I am trying to get from collection1
first because the documentation of TakeFromAny
does not give any guarantees about the collection order from which to take the item if the two collections have items.
This also means that if both collections are empty, and then they receive items at the same time later, then I might get an item from collection2
first, which is fine.
EDIT:
The reason I add items to two collections (and not simply a single collection) is that the first collection does not have an upper-bound, and the second collection does.
More details for those who are interested why I need this:
I am using this in an open source project called ProceduralDataflow. See here for more details https://github.com/ymassad/ProceduralDataflow
Each processing node in the dataflow system has two collections, one collection will contain items coming for the first time (so I need to slow down the producer if needed), and another collection will contain items coming for the second (or third,..) times (as a result of a loop in the data flow).
The reason why one collection does not have an upper-bound is that I don't want to have deadlocks as a result of loops in the dataflow.
First, short answers to your concrete questions.
Does it throw if
CompleteAdding
was called on any collection? or both collections?
Both (all) - but only if there are no available elements in any collection.
What if
CompleteAdding
was called and the collection still has some items, does it throw?
No. If there is available element in the collection, it will be removed from the collection and returned to the caller.
Conclusion
Apparently the documentation is unclear. The part
or
CompleteAdding()
has been called on the collection
should have been formulated differently - something like
or there is no available element in any of the collections and
CompleteAdding()
has been called on all the collections
Rationale
Well, I know relying on the implementation is not a good practice, but when the documentation is unclear, the implementation is the only reliable and official source I can think of. So taking the reference source, both TakeFromAny
and TryTakeFromAny
call a private method TryTakeFromAnyCore
. It starts with the following:
ValidateCollectionsArray(collections, false);
false
here is a bool
argument called isAddOperation
and is used inside the ValidateCollectionsArray
as follows:
if (isAddOperation && collections[i].IsAddingCompleted)
{
throw new ArgumentException(
SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
}
which is one of the possible places throwing ArgumentException
for collections with CompleteAdding()
being called. And as we can see, this is not the case (question #1).
Then the implementation continues with the following "fast path":
//try the fast path first
for (int i = 0; i < collections.Length; i++)
{
// Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
return i;
}
This proves the answer to the question #2.
Finally, if there is no available element in any of the collections, the implementation take the "slow path" by calling another private method TryTakeFromAnyCoreSlow
, with the following comment being the essential explanation of the implemented behavior:
//Loop until one of these conditions is met:
// 1- The operation is succeeded
// 2- The timeout expired for try* versions
// 3- The external token is cancelled, throw
// 4- The operation is TryTake and all collections are marked as completed, return false
// 5- The operation is Take and all collection are marked as completed, throw
The answer to both our questions is in case #1 and case #5 (note the word all). Btw, it also shows the only difference between TakeFromAny
and TryTakeFromAny
- case #4 vs #5, i.e. throw
vs return -1
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With