Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to take an item from any two BlockingCollections with priority to the first collection?

I have two BlockingCollection<T> objects, collection1 and collection2. I want to consume items from these collections giving priority to items in collection1. That is, if both collections have items, I want to take items from collection1 first. If none of them have items, I want to wait for an item to be available.

I have the following code:

public static T Take<T>(
    BlockingCollection<T> collection1,
    BlockingCollection<T> collection2) where T:class
{
    if (collection1.TryTake(out var item1))
    {
        return item1;
    }

    T item2;

    try
    {
        BlockingCollection<T>.TakeFromAny(
            new[] { collection1, collection2 },
            out item2);
    }
    catch (ArgumentException)
    {
        return null;
    }

    return item2;
}

This code is expected to return null when CompleteAdding is called on both collections and they both are empty.

My main issue with this code is that the documentation for the TakeFromAny method specifies that TakeFromAny will throw an ArgumentException if CompleteAdding was called on "the collection":

ArgumentException

The collections argument is a 0-length array or contains a null element or CompleteAdding() has been called on the collection.

Does it throw if CompleteAdding was called on any collection? or both collections?

What if CompleteAdding was called and the collection still has some items, does it throw?

I need a reliable way to do this.

In this code I am trying to get from collection1 first because the documentation of TakeFromAny does not give any guarantees about the collection order from which to take the item if the two collections have items.

This also means that if both collections are empty, and then they receive items at the same time later, then I might get an item from collection2 first, which is fine.

EDIT:

The reason I add items to two collections (and not simply a single collection) is that the first collection does not have an upper-bound, and the second collection does.

More details for those who are interested why I need this:

I am using this in an open source project called ProceduralDataflow. See here for more details https://github.com/ymassad/ProceduralDataflow

Each processing node in the dataflow system has two collections, one collection will contain items coming for the first time (so I need to slow down the producer if needed), and another collection will contain items coming for the second (or third,..) times (as a result of a loop in the data flow).

The reason why one collection does not have an upper-bound is that I don't want to have deadlocks as a result of loops in the dataflow.

like image 311
Yacoub Massad Avatar asked Aug 29 '17 20:08

Yacoub Massad


1 Answers

First, short answers to your concrete questions.

Does it throw if CompleteAdding was called on any collection? or both collections?

Both (all) - but only if there are no available elements in any collection.

What if CompleteAdding was called and the collection still has some items, does it throw?

No. If there is available element in the collection, it will be removed from the collection and returned to the caller.

Conclusion

Apparently the documentation is unclear. The part

or CompleteAdding() has been called on the collection

should have been formulated differently - something like

or there is no available element in any of the collections and CompleteAdding() has been called on all the collections

Rationale

Well, I know relying on the implementation is not a good practice, but when the documentation is unclear, the implementation is the only reliable and official source I can think of. So taking the reference source, both TakeFromAny and TryTakeFromAny call a private method TryTakeFromAnyCore. It starts with the following:

ValidateCollectionsArray(collections, false);

false here is a bool argument called isAddOperation and is used inside the ValidateCollectionsArray as follows:

if (isAddOperation && collections[i].IsAddingCompleted)
{
    throw new ArgumentException(
        SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections");
}

which is one of the possible places throwing ArgumentException for collections with CompleteAdding() being called. And as we can see, this is not the case (question #1).

Then the implementation continues with the following "fast path":

//try the fast path first
for (int i = 0; i < collections.Length; i++)
{
    // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count
    if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
        return i;
}

This proves the answer to the question #2.

Finally, if there is no available element in any of the collections, the implementation take the "slow path" by calling another private method TryTakeFromAnyCoreSlow, with the following comment being the essential explanation of the implemented behavior:

//Loop until one of these conditions is met:
// 1- The operation is succeeded
// 2- The timeout expired for try* versions
// 3- The external token is cancelled, throw
// 4- The operation is TryTake and all collections are marked as completed, return false
// 5- The operation is Take and all collection are marked as completed, throw

The answer to both our questions is in case #1 and case #5 (note the word all). Btw, it also shows the only difference between TakeFromAny and TryTakeFromAny - case #4 vs #5, i.e. throw vs return -1.

like image 186
Ivan Stoev Avatar answered Nov 18 '22 03:11

Ivan Stoev