Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there an example of Ix.NET (System.Interactive) somewhere?

I have an async method, say:

public async Task<T> GetAsync()
{

}

and would be called from:

public async Task<IEnumerable<T>> GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        yield return result;
    }
}

The above syntax is not valid but basically I am after asynchronous generators. I know it can be handled via Observables. I did experiment with Rx.NET and it worked to some extent. But I am trying to avoid the complexity it brings to codebase, and more importantly the above requirement is still essentially not a reactive system (ours is still pull based). For e.g. I would only listen to the incoming async streams for a certain time and I have to stop the producer (not just unsubscribe the consumer) from the consumer side.

I can invert the method signature like this:

public IEnumerable<Task<T>> GetAllAsync()

But this makes doing LINQ operations bit tricky without blocking. I want it to be non-blocking as well as without loading the entire thing into memory. This library: AsyncEnumerable does exactly what I am looking for but how can the same be done with Ix.NET? They are meant for the same thing I believe.

In other words, how can I make use of Ix.NET to generate an IAsyncEnumerable when dealing with await? Like,

public async IAsyncEnumerable GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        return // what?
    }
}
like image 716
nawfal Avatar asked Mar 08 '18 02:03

nawfal


1 Answers

(Edited)

Using System.Linq.Async 4.0.0 from NuGet, now you can use SelectAwait.

class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something
            .ToAsyncEnumerable()
            .SelectAwait(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

(Obsolete)

Using System.Interactive.Async 3.2.0 from NuGet, how about this? Currently Select() does not support async lambda, you have to implement it by yourself.

Better support for async - Task based overloads for AsyncEnumerable

class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something.SelectAsync(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

static class AsyncEnumerableExtensions
{
    public static IAsyncEnumerable<TResult> SelectAsync<T, TResult>(this IEnumerable<T> enumerable, Func<T, Task<TResult>> selector)
    {
        return AsyncEnumerable.CreateEnumerable(() =>
        {
            var enumerator = enumerable.GetEnumerator();
            var current = default(TResult);
            return AsyncEnumerable.CreateEnumerator(async c =>
                {
                    var moveNext = enumerator.MoveNext();
                    current = moveNext
                        ? await selector(enumerator.Current).ConfigureAwait(false)
                        : default(TResult);
                    return moveNext;
                },
                () => current,
                () => enumerator.Dispose());
        });
    }
}

The extension method is quoted from this sample. https://github.com/maca88/AsyncGenerator/issues/94#issuecomment-385286972

like image 170
cactuaroid Avatar answered Nov 13 '22 23:11

cactuaroid