Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to force an IAsyncEnumerable to respect a CancellationToken

Edit: The requirements of this question have changed. See the Update section below.

I have an async iterator method that produces an IAsyncEnumerable<int> (a stream of numbers), one number every 200 msec. The caller of this method consumes the stream, but wants to stop the enumeration after 1000 msec. So a CancellationTokenSource is used, and the token is passed as an argument to the WithCancellation extension method. But the token is not respected. The enumeration continues until all the numbers are consumed:

static async IAsyncEnumerable<int> GetSequence()
{
    for (int i = 1; i <= 10; i++)
    {
        await Task.Delay(200);
        yield return i;
    }
}

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

Output:

12:55:17.506 > 1
12:55:17.739 > 2
12:55:17.941 > 3
12:55:18.155 > 4
12:55:18.367 > 5
12:55:18.570 > 6
12:55:18.772 > 7
12:55:18.973 > 8
12:55:19.174 > 9
12:55:19.376 > 10

The expected output is a TaskCanceledException to occur after number 5. It seems that I have misunderstood what the WithCancellation is actually doing. The method just passes the supplied token to the iterator method, if that method accepts one. Otherwise, like with the method GetSequence() in my example, the token is ignored. I suppose that the solution in my case is to interrogate manually the token inside the body of the enumeration:

var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
    cts.Token.ThrowIfCancellationRequested();
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}

This is simple and works well. But in any case I wonder if it would be possible to create an extension method that does what I expected the WithCancellation to do, to bake the token inside the ensuing enumeration. This is the signature of the needed method:

public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
    this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
    // Is it possible?
}

Update: It seems that when I asked this question I had an incorrect understanding about the purpose of the whole cancellation concept. I was under the impression that cancelling is intended for breaking the loop after the awaiting of MoveNextAsync, while the real purpose is to cancel the awaiting itself. In my trivial example the awaiting lasts only 200 msec, but in a real world example the awaiting could be much longer, even infinite. After realizing this, my question in its current form has almost no value, and I must either delete it and open a new one with the same title, or change the requirements of the existing question. Both options are bad in one way or another.

I decided to go with the second option. So I am un-accepting the currently accepted answer, and I am asking for a new solution for the more difficult problem of enforcing the cancellation in a way that has immediate effect. In other words cancelling the token should result to the completion of the async enumeration in a matter of milliseconds. Lets give a practical example to distinguish between desirable and undesirable behavior:

var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
    await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
    {
        Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}

Output (desirable):

0:00.242 > 1
0:00.467 > 2
0:00.500 > Canceled

Output (undesirable):

0:00.242 > 1
0:00.467 > 2
0:00.707 > Canceled

GetSequence is the same method as in the initial example, that streams one number every 200 msec. This method doesn't support cancellation, and the premise is that we can't change that. WithEnforcedCancellation is the required extension method that should fix this problem.

like image 658
Theodor Zoulias Avatar asked Oct 04 '19 10:10

Theodor Zoulias


1 Answers

IAsyncEnumerable explicitly provides for this mechanism with the EnumeratorCancellation attribute:

static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
    for (int i = 1; i <= 10; i++) {
        ct.ThrowIfCancellationRequested();
        await Task.Delay(200);    // or `Task.Delay(200, ct)` if this wasn't an example
        yield return i;
    }
}

In fact, the compiler is helpful enough to issue a warning if you give the method a CancellationToken parameter, but do not add the attribute.

Note that the token passed to .WithCancellation will override any local token passed to the method. The specs have the details on this.

Of course, this will still only work if the enumeration actually accepts a CancellationToken -- but the fact that cancellation only really works if done cooperatively is true of any async work. Yeldar's answer is good for "forcing" some measure of cancellation into an enumerable that doesn't support it, but the preferred solution should be to modify the enumeration to support cancellation by itself -- the compiler does everything to help you out.

like image 170
Jeroen Mostert Avatar answered Nov 01 '22 09:11

Jeroen Mostert