I'm wondering if there is a way to create either IAsyncEnumerable<T>
or IAsyncEnumerator<T>
via a Source object, rather like TaskCompletionSource
allows one to do for tasks. In particular, TaskCompletionSource
can be passed around like any other parameter.
Maybe something like this:
public class AsyncEnumerables {
public Task HandlerTask { get; set; }
public async Task<string> ParentMethod() {
var source = new AsyncEnumerableSource<int>();
IAsyncEnumerable asyncEnumerable = source.GetAsyncEnumerable();
HandlerTask = Task.Run(() => handleAsyncResultsAsTheyHappen(asyncEnumerable));
int n = await someOtherTask();
source.YieldReturn(n);
var r = await ChildMethod(source);
source.Complete(); // this call would cause the HandlerTask to complete.
return r;
}
private async Task<string> ChildMethod(AsyncEnumerableSource<int> source) {
source.YieldReturn(5);
await SomeOtherCall();
source.YieldReturn(10);
return "hello";
}
}
With the above code, the handleAsyncResultsAsTheyHappen
task would see whatever values got passed into YieldReturn. So it would see the n
from the above code, as well as the 5
and the 10
from ChildMethod
.
Here is another implementation of the AsyncEnumerableSource
class, that doesn't depend on the Rx library. This one depends instead on the Channel<T>
, class, which is natively available in the .NET standard libraries. It has identical behavior to the Rx-based implementation.
The class AsyncEnumerableSource
can propagate notifications to multiple subscribers. Each subscriber can enumerate these notifications at its own pace. This is possible because each subscription has its own dedicated Channel<T>
as underlying storage. The lifetime of a subscription is practically tied to the lifetime of a single await foreach
loop. Breaking early from a loop for any reason (including thrown exceptions), ends immediately the subscription.
In technical terms a new subscription is created the first time that the MoveNextAsync
method of an IAsyncEnumerator<T>
is invoked. Calling the method GetAsyncEnumerable
alone doesn't create a subscription, nor calling the GetAsyncEnumerator
method does. The subscription ends when the associated IAsyncEnumerator<T>
is disposed.
public class AsyncEnumerableSource<T>
{
private readonly List<Channel<T>> _channels = new();
private bool _completed;
private Exception _exception;
public async IAsyncEnumerable<T> GetAsyncEnumerable(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Channel<T> channel;
lock (_channels)
{
if (_exception != null) throw _exception;
if (_completed) yield break;
channel = Channel.CreateUnbounded<T>(
new() { SingleWriter = true, SingleReader = true });
_channels.Add(channel);
}
try
{
await foreach (var item in channel.Reader.ReadAllAsync()
.WithCancellation(cancellationToken).ConfigureAwait(false))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
}
finally { lock (_channels) _channels.Remove(channel); }
}
public void YieldReturn(T value)
{
lock (_channels)
{
if (_completed) return;
foreach (var channel in _channels) channel.Writer.TryWrite(value);
}
}
public void Complete()
{
lock (_channels)
{
if (_completed) return;
foreach (var channel in _channels) channel.Writer.TryComplete();
_completed = true;
}
}
public void Fault(Exception error)
{
lock (_channels)
{
if (_completed) return;
foreach (var channel in _channels) channel.Writer.TryComplete(error);
_completed = true;
_exception = error;
}
}
}
The reason for the cancellationToken.ThrowIfCancellationRequested();
is because of this issue: ChannelReader.ReadAllAsync(CancellationToken) not actually cancelled mid-iteration.
Caution: in case you start propagating values with YieldReturn
before any consumer has subscribed to the AsyncEnumerableSource
, these values are going to be lost. No subscriber is going to observe them. To prevent this scenario you should make sure that all consumers have subscribed before starting the producers. The easiest way to do it is for the consumers to be async
methods, with the await foreach
being the first await
inside the async
method:
// Correct, synchronous subscription
async Task Consume()
{
await foreach (var item in source.GetAsyncEnumerable())
{
//...
}
}
Task consumer = Consume();
Avoid the temptation to use the Task.Run
method, because in this case the subscription will occur asynchronously on a ThreadPool
thread, and not synchronously with the creation of the consumer:
// Wrong, delayed subscription (possibility for unobserved values)
Task consumer = Task.Run(async () =>
{
await foreach (var item in source.GetAsyncEnumerable())
{
//...
}
});
In case that you don't want to do the subscriptions synchronously, it is possible to offload them to the ThreadPool
, and await
them to be established before starting the producers:
// Correct, awaited subscription
Task consumer = await Task.Factory.StartNew(async () =>
{
HeavySynchronousComputation();
await foreach (var item in source.GetAsyncEnumerable())
{
//...
}
}, default, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
The Task.Factory.StartNew(async
method creates a nested Task<Task>
. The outer task represents the subscription, and the inner task represents the consuming loop.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With