Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Factory for IAsyncEnumerable or IAsyncEnumerator

I'm wondering if there is a way to create either IAsyncEnumerable<T> or IAsyncEnumerator<T> via a Source object, rather like TaskCompletionSource allows one to do for tasks. In particular, TaskCompletionSource can be passed around like any other parameter.

Maybe something like this:

public class AsyncEnumerables {

    public Task HandlerTask { get; set; }

    public async Task<string> ParentMethod() {
        var source = new AsyncEnumerableSource<int>();
        IAsyncEnumerable asyncEnumerable = source.GetAsyncEnumerable();
        HandlerTask = Task.Run(() => handleAsyncResultsAsTheyHappen(asyncEnumerable));
        int n = await someOtherTask();
        source.YieldReturn(n);
        var r = await ChildMethod(source);
        source.Complete();  // this call would cause the HandlerTask to complete.
        return r;
    }

    private async Task<string> ChildMethod(AsyncEnumerableSource<int> source) {
        source.YieldReturn(5);
        await SomeOtherCall();
        source.YieldReturn(10);
        return "hello";
    }
}

With the above code, the handleAsyncResultsAsTheyHappen task would see whatever values got passed into YieldReturn. So it would see the n from the above code, as well as the 5 and the 10 from ChildMethod.

like image 877
William Jockusch Avatar asked Mar 02 '23 12:03

William Jockusch


1 Answers

Here is another implementation of the AsyncEnumerableSource class, that doesn't depend on the Rx library. This one depends instead on the Channel<T>, class, which is natively available in the .NET standard libraries. It has identical behavior to the Rx-based implementation.

The class AsyncEnumerableSource can propagate notifications to multiple subscribers. Each subscriber can enumerate these notifications at its own pace. This is possible because each subscription has its own dedicated Channel<T> as underlying storage. The lifetime of a subscription is practically tied to the lifetime of a single await foreach loop. Breaking early from a loop for any reason (including thrown exceptions), ends immediately the subscription.

In technical terms a new subscription is created the first time that the MoveNextAsync method of an IAsyncEnumerator<T> is invoked. Calling the method GetAsyncEnumerable alone doesn't create a subscription, nor calling the GetAsyncEnumerator method does. The subscription ends when the associated IAsyncEnumerator<T> is disposed.

public class AsyncEnumerableSource<T>
{
    private readonly List<Channel<T>> _channels = new();
    private bool _completed;
    private Exception _exception;

    public async IAsyncEnumerable<T> GetAsyncEnumerable(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        Channel<T> channel;
        lock (_channels)
        {
            if (_exception != null) throw _exception;
            if (_completed) yield break;
            channel = Channel.CreateUnbounded<T>(
                new() { SingleWriter = true, SingleReader = true });
            _channels.Add(channel);
        }
        try
        {
            await foreach (var item in channel.Reader.ReadAllAsync()
                .WithCancellation(cancellationToken).ConfigureAwait(false))
            {
                yield return item;
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
        finally { lock (_channels) _channels.Remove(channel); }
    }

    public void YieldReturn(T value)
    {
        lock (_channels)
        {
            if (_completed) return;
            foreach (var channel in _channels) channel.Writer.TryWrite(value);
        }
    }

    public void Complete()
    {
        lock (_channels)
        {
            if (_completed) return;
            foreach (var channel in _channels) channel.Writer.TryComplete();
            _completed = true;
        }
    }

    public void Fault(Exception error)
    {
        lock (_channels)
        {
            if (_completed) return;
            foreach (var channel in _channels) channel.Writer.TryComplete(error);
            _completed = true;
            _exception = error;
        }
    }
}

The reason for the cancellationToken.ThrowIfCancellationRequested(); is because of this issue: ChannelReader.ReadAllAsync(CancellationToken) not actually cancelled mid-iteration.

Caution: in case you start propagating values with YieldReturn before any consumer has subscribed to the AsyncEnumerableSource, these values are going to be lost. No subscriber is going to observe them. To prevent this scenario you should make sure that all consumers have subscribed before starting the producers. The easiest way to do it is for the consumers to be async methods, with the await foreach being the first await inside the async method:

// Correct, synchronous subscription
async Task Consume()
{
    await foreach (var item in source.GetAsyncEnumerable())
    {
        //...
    }
}
Task consumer = Consume();

Avoid the temptation to use the Task.Run method, because in this case the subscription will occur asynchronously on a ThreadPool thread, and not synchronously with the creation of the consumer:

// Wrong, delayed subscription (possibility for unobserved values)
Task consumer = Task.Run(async () =>
{
    await foreach (var item in source.GetAsyncEnumerable())
    {
        //...
    }
});

In case that you don't want to do the subscriptions synchronously, it is possible to offload them to the ThreadPool, and await them to be established before starting the producers:

// Correct, awaited subscription
Task consumer = await Task.Factory.StartNew(async () =>
{
    HeavySynchronousComputation();
    await foreach (var item in source.GetAsyncEnumerable())
    {
        //...
    }
}, default, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

The Task.Factory.StartNew(async method creates a nested Task<Task>. The outer task represents the subscription, and the inner task represents the consuming loop.

like image 103
Theodor Zoulias Avatar answered Mar 09 '23 07:03

Theodor Zoulias