Edit: The requirements of this question have changed. See the Update section below.
I have an async iterator method that produces an IAsyncEnumerable<int>
(a stream of numbers), one number every 200 msec. The caller of this method consumes the stream, but wants to stop the enumeration after 1000 msec. So a CancellationTokenSource
is used, and the token is passed as
an argument to the WithCancellation
extension method. But the token is not respected. The enumeration continues until all the numbers are consumed:
static async IAsyncEnumerable<int> GetSequence()
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(200);
yield return i;
}
}
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
Output:
12:55:17.506 > 1
12:55:17.739 > 2
12:55:17.941 > 3
12:55:18.155 > 4
12:55:18.367 > 5
12:55:18.570 > 6
12:55:18.772 > 7
12:55:18.973 > 8
12:55:19.174 > 9
12:55:19.376 > 10
The expected output is a TaskCanceledException
to occur after number 5. It seems that I have misunderstood what the WithCancellation
is actually doing. The method just passes the supplied token to the iterator method, if that method accepts one. Otherwise, like with the method GetSequence()
in my example, the token is ignored. I suppose that the solution in my case is to interrogate manually the token inside the body of the enumeration:
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
cts.Token.ThrowIfCancellationRequested();
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
This is simple and works well. But in any case I wonder if it would be possible to create an extension method that does what I expected the WithCancellation
to do, to bake the token inside the ensuing enumeration. This is the signature of the needed method:
public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
// Is it possible?
}
Update: It seems that when I asked this question I had an incorrect understanding about the purpose of the whole cancellation concept. I was under the impression that cancelling is intended for breaking the loop after the awaiting of MoveNextAsync
, while the real purpose is to cancel the awaiting itself. In my trivial example the awaiting lasts only 200 msec, but in a real world example the awaiting could be much longer, even infinite. After realizing this, my question in its current form has almost no value, and I must either delete it and open a new one with the same title, or change the requirements of the existing question. Both options are bad in one way or another.
I decided to go with the second option. So I am un-accepting the currently accepted answer, and I am asking for a new solution for the more difficult problem of enforcing the cancellation in a way that has immediate effect. In other words cancelling the token should result to the completion of the async enumeration in a matter of milliseconds. Lets give a practical example to distinguish between desirable and undesirable behavior:
var cts = new CancellationTokenSource(500);
var stopwatch = Stopwatch.StartNew();
try
{
await foreach (var i in GetSequence().WithEnforcedCancellation(cts.Token))
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > {i}");
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"{stopwatch.Elapsed:m':'ss'.'fff} > Canceled");
}
Output (desirable):
0:00.242 > 1
0:00.467 > 2
0:00.500 > Canceled
Output (undesirable):
0:00.242 > 1
0:00.467 > 2
0:00.707 > Canceled
GetSequence
is the same method as in the initial example, that streams one number every 200 msec. This method doesn't support cancellation, and the premise is that we can't change that. WithEnforcedCancellation
is the required extension method that should fix this problem.
IAsyncEnumerable
explicitly provides for this mechanism with the EnumeratorCancellation
attribute:
static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
for (int i = 1; i <= 10; i++) {
ct.ThrowIfCancellationRequested();
await Task.Delay(200); // or `Task.Delay(200, ct)` if this wasn't an example
yield return i;
}
}
In fact, the compiler is helpful enough to issue a warning if you give the method a CancellationToken
parameter, but do not add the attribute.
Note that the token passed to .WithCancellation
will override any local token passed to the method. The specs have the details on this.
Of course, this will still only work if the enumeration actually accepts a CancellationToken
-- but the fact that cancellation only really works if done cooperatively is true of any async
work. Yeldar's answer is good for "forcing" some measure of cancellation into an enumerable that doesn't support it, but the preferred solution should be to modify the enumeration to support cancellation by itself -- the compiler does everything to help you out.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With