Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can BlockingCollection(T).GetConsumingEnumerable() throw OperationCanceledException?

I'm using a BlockingCollection to implement a task scheduler, basically:

public class DedicatedThreadScheduler : TaskScheduler, IDisposable
{
    readonly BlockingCollection<Task> m_taskQueue = new BlockingCollection<Task>();

    readonly Thread m_thread;


    public DedicatedThreadScheduler()
    {
        m_thread = new Thread(() =>
        {
            foreach (var task in m_taskQueue.GetConsumingEnumerable())
            {
                TryExecuteTask(task);
            }
            m_taskQueue.Dispose();
        });
        m_thread.Start();
    }

    public void Dispose()
    {
        m_taskQueue.CompleteAdding();
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return Thread.CurrentThread == m_thread && TryExecuteTask(task);
    }

    (...)
}

I've only seen this once and couldn't reproduce this, but at some point on the foreach (in TryTakeWithNoTimeValidation) I got an OperationCanceledException. I don't understand as I'm using the overload that doesn't take a CancellationToken, and the documentation states that it may only throw an ObjectDisposedException. What would the exception mean? That the blocking collection was finalized? That a task in the queue was cancelled?

Update: the call stack looks like this:

mscorlib.dll!System.Threading.SemaphoreSlim.WaitUntilCountOrTimeout(int millisecondsTimeout, uint startTime, System.Threading.CancellationToken cancellationToken) + 0x36 bytes 
mscorlib.dll!System.Threading.SemaphoreSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x178 bytes   
System.dll!System.Collections.Concurrent.BlockingCollection<System.Threading.Tasks.Task>.TryTakeWithNoTimeValidation(out System.Threading.Tasks.Task item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken, System.Threading.CancellationTokenSource combinedTokenSource) Line 710 + 0x25 bytes   C#
System.dll!System.Collections.Concurrent.BlockingCollection<System.Threading.Tasks.Task>.GetConsumingEnumerable(System.Threading.CancellationToken cancellationToken) Line 1677 + 0x18 bytes    C#
like image 740
Asik Avatar asked Apr 09 '14 15:04

Asik


1 Answers

This is an old question, but I'll add the full answer for anyone that finds it in the future. The answer provided by Eugene is partly correct; at the time you must have been debugging with Visual Studio configured to break on handled framework exceptions.

However, the actual reason you were breaking on an OperationCanceledException is that the code for BlockingCollection<T>.CompleteAdding() looks like this:

    public void CompleteAdding()
    {
        int num;
        this.CheckDisposed();
        if (this.IsAddingCompleted)
        {
            return;
        }
        SpinWait wait = new SpinWait();
    Label_0017:
        num = this.m_currentAdders;
        if ((num & -2147483648) != 0)
        {
            wait.Reset();
            while (this.m_currentAdders != -2147483648)
            {
                wait.SpinOnce();
            }
        }
        else if (Interlocked.CompareExchange(ref this.m_currentAdders, num | -2147483648, num) == num)
        {
            wait.Reset();
            while (this.m_currentAdders != -2147483648)
            {
                wait.SpinOnce();
            }
            if (this.Count == 0)
            {
                this.CancelWaitingConsumers();
            }
            this.CancelWaitingProducers();
        }
        else
        {
            wait.SpinOnce();
            goto Label_0017;
        }
    }

Notice these particular lines:

if (this.Count == 0)
{
    this.CancelWaitingConsumers();
}

which call this method:

private void CancelWaitingConsumers()
{
    this.m_ConsumersCancellationTokenSource.Cancel();
}

So even though you weren't explicitly using a CancellationToken in your code, the underlying framework code throws an OperationCanceledException if the BlockingCollection is empty when CompleteAdding() is called. It does this to signal the GetConsumingEnumerable() method to exit. The exception is handled by the framework code and you wouldn't have noticed it if you hadn't had your debugger configured to intercept it.

The reason you couldn't replicate it is because you placed your call to CompleteAdding() in your Dispose() method. Therefore, it was getting called at the whim of the GC.

like image 111
0b101010 Avatar answered Oct 17 '22 09:10

0b101010