Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cancel GetConsumingEnumerable() on BlockingCollection

In the following code I'm using the CancellationToken to wake up the GetConsumingEnumerable() when the producer is not producing and I want to break out of the foreach and exit the Task. But I dont see IsCancellationRequested being logged and my Task.Wait(timeOut) waits for the full timeOut period. What am I doing wrong?

userToken.Task = Task.Factory.StartNew(state =>
{
    userToken.CancelToken = new CancellationTokenSource();

    foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
    {
        if (userToken.CancelToken.IsCancellationRequested)
        {
            Log.Write("BroadcastQueue IsCancellationRequested");
            break;
            ...
        }
    }

    return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);

later...

UserToken.CancelToken.Cancel();          
try
{
    task.Wait(timeOut);
}
catch (AggregateException ar)
{
    Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
    Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}
like image 202
Spud Avatar asked Feb 02 '12 02:02

Spud


2 Answers

You could use CompleteAdding() which signifies that no more items will be added to the collection. If GetConsumingEnumerable is used, the foreach will end gracefully as it will know there's no point in waiting for more items.

Basically once you have finished adding items to the BlockingCollection just do:

myBlockingCollection.CompleteAdding() 

Any threads which are doing a foreach loop with GetConsumingEnumerable will stop looping.

like image 96
Anthony Avatar answered Nov 12 '22 03:11

Anthony


I've created quick prototype, and it seems work for me.

Note Thread.Sleep(1000) right before token cancel request. It is possible that you are creating a race condition for Token variable, since you create and access item.CancelToken variable in different threads.

E.g. the code that is intended to cancel task might invoke cancel on wrong (previous or null) cancellation token.

static void Main(string[] args)
{
    CancellationTokenSource token = null;
    BlockingCollection<string> coll = new BlockingCollection<string>();
    var t = Task.Factory.StartNew(state =>
    {
        token = new CancellationTokenSource();
        try
        {
            foreach (var broadcast in coll.GetConsumingEnumerable(token.Token))
            {
                if (token.IsCancellationRequested)
                {
                    return;
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Cancel");
            return;
        }
    }, "TaskSubscribe", TaskCreationOptions.LongRunning);
    Thread.Sleep(1000);
    token.Cancel();
    t.Wait();
}
like image 24
Valera Kolupaev Avatar answered Nov 12 '22 05:11

Valera Kolupaev