I have one process generating work and a second process with a BlockingCollection<>
that consumes that work. When I close my program, I need my consumer to stop consuming work, but I still need to quickly log the work that was pending but hadn't been consumed.
Right now, my consumer spawns a thread that has a foreach (<object> in BlockingCollection.GetConsumingEnumerable())
loop. When I stop my program, my producer calls Consumer.BlockingCollection.CompleteAdding()
. What I find is that my consumer continues to process everything in the queue.
Googling the issues tells me that I need to use a CancellationToken
. So I tried that out:
private void Process () { // This method runs in a separate thread
try {
foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
// Consume
}
}
catch (OperationCancelledException) {
foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
// quickly log
}
}
}
My producer has:
private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();
When I try this, I get no indication that the OperationCancelledException ever happened.
This question tries to explain the use of a cancellation token, but it seems like it's not using it correctly. (Argument: if it works, then it's "correct enough".) And this question would appear to be an exact duplicate of my question but with no example. (Same here.)
So to reiterate: How do I properly use a CancellationToken
on BlockingCollection.GetConsumingEnumerable()
with the caveat that I need to process the remaining items in the queue after it gets cancelled using a different method?
(I think my problem is centered around the proper use of the CancellationToken. None of my testing indicates that the process is actually being cancelled. (StopFlag.IsCancellationRequested
always equals false
.))
When you pass in the CancellationToken
to GetConsumingEnumerable
it won't throw an exception of cancellation is requested, it'll just stop spitting out items. Rather than catching the exception just check the token:
foreach (var item in BlockingCollection.
GetConsumingEnumerable(CancellationToken))
{
//consume item
}
if (CancellationToken.IsCancellationRequested)
foreach (var item in BlockingCollection)
{
//log item
}
Also note that, if cancellation is requested, and it's possible that CompletedAdding
hasn't been called then you should just iterate the collection, not call GetConsumingEnumerable
. If you know that the producer will complete adding when the operation is cancelled then that's not a problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With