Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cancelling BlockingCollection.GetConsumingEnumerable() and processing what's left

I have one process generating work and a second process with a BlockingCollection<> that consumes that work. When I close my program, I need my consumer to stop consuming work, but I still need to quickly log the work that was pending but hadn't been consumed.

Right now, my consumer spawns a thread that has a foreach (<object> in BlockingCollection.GetConsumingEnumerable()) loop. When I stop my program, my producer calls Consumer.BlockingCollection.CompleteAdding(). What I find is that my consumer continues to process everything in the queue.

Googling the issues tells me that I need to use a CancellationToken. So I tried that out:

private void Process () { // This method runs in a separate thread
    try {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
            // Consume
        }
    }
    catch (OperationCancelledException) {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
            // quickly log
        }
    }
}

My producer has:

private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();

When I try this, I get no indication that the OperationCancelledException ever happened.

This question tries to explain the use of a cancellation token, but it seems like it's not using it correctly. (Argument: if it works, then it's "correct enough".) And this question would appear to be an exact duplicate of my question but with no example. (Same here.)

So to reiterate: How do I properly use a CancellationToken on BlockingCollection.GetConsumingEnumerable() with the caveat that I need to process the remaining items in the queue after it gets cancelled using a different method?

(I think my problem is centered around the proper use of the CancellationToken. None of my testing indicates that the process is actually being cancelled. (StopFlag.IsCancellationRequested always equals false.))

like image 952
Jason Avatar asked Nov 11 '13 17:11

Jason


1 Answers

When you pass in the CancellationToken to GetConsumingEnumerable it won't throw an exception of cancellation is requested, it'll just stop spitting out items. Rather than catching the exception just check the token:

foreach (var item in BlockingCollection.
    GetConsumingEnumerable(CancellationToken))
{
    //consume item
}
if (CancellationToken.IsCancellationRequested)
    foreach (var item in BlockingCollection)
    {
        //log item
    }

Also note that, if cancellation is requested, and it's possible that CompletedAdding hasn't been called then you should just iterate the collection, not call GetConsumingEnumerable. If you know that the producer will complete adding when the operation is cancelled then that's not a problem.

like image 80
Servy Avatar answered Oct 07 '22 01:10

Servy