I working with AsyncEnumerables to read a bunch of files into my C# application (.NET 6) and want to add an option to cancel reading. However I'm confused by the behaviour of ToArrayAsync
when provided with a cancellationToken
.
I tried to break it down to this snippet:
var enumerable = Enumerable.Range(1, 10)
.ToAsyncEnumerable()
.SelectAwait(async x => { await Task.Delay(1000); return x; });
var cts = new CancellationTokenSource();
var resultTask = enumerable.ToArrayAsync(cts.Token);
cts.Cancel();
var z = await resultTask;
From my understanding SelectAwait
and ToArrayAsync
should evaluate one entry after another (which is supported by timing it). I'd expect ToArrayAsync
to check the provided token after everyone of those steps. I tried looking up the source for it and think it boils down to this bit, seemingly also supporting my assumption:
public ValueTask<bool> MoveNextAsync()
{
_cancellationToken.ThrowIfCancellationRequested();
return _source.MoveNextAsync(); // REVIEW: Signal cancellation through task or synchronously?
}
However in practice ToArrayAsync
fully evaluates all entries of the enumerable (taking the full 10 seconds) and returns without a cancellation-exception. Calling ToArrayAsync
with an already cancelled token immediately throws, so presumably it only checks at the very start of its execution?
I know I can work around that by using SelectAwaitWithCancellation
and checking to token inside its lambda, but I'm unsure about my understanding of ToArrayAsync
and AsyncEnumerables in general, so I'd be happy about clarification about how it works and what's the design thinking behind it.
This behavior stems from how the ToAsyncEnumerable
operator is implemented in the System.Linq.Async package. This operator checks the CancellationToken
passed to the GetAsyncEnumerator
method only once. It's not rechecked in each MoveNextAsync
operation. Here is a minimal demonstration of this behavior:
using CancellationTokenSource cts = new();
IAsyncEnumerable<int> sequence = Enumerable.Range(1, 10).ToAsyncEnumerable();
IAsyncEnumerator<int> enumerator = sequence.GetAsyncEnumerator(cts.Token);
cts.Cancel();
bool moved = await enumerator.MoveNextAsync();
Console.WriteLine($"moved: {moved}");
Output:
moved: True
Online demo.
In case you want a ToAsyncEnumerable
operator that observes properly the cancellation token, here is one:
#pragma warning disable 1998
public static async IAsyncEnumerable<T> ToAsyncEnumerable2<T>(
#pragma warning restore 1998
this IEnumerable<T> source,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
cancellationToken.ThrowIfCancellationRequested();
using IEnumerator<T> enumerator = source.GetEnumerator();
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
if (!enumerator.MoveNext()) break;
yield return enumerator.Current;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With