Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Enforcing one async observable at a time

I'm trying to integrate some TPL async into a larger Rx chain using Observable.FromAsync, like in this small example:

using System;
using System.Reactive.Linq;
using System.Threading.Tasks;

namespace rxtest
{
    class Program
    {
        static void Main(string[] args)
        {
            MainAsync().Wait();
        }

        static async Task MainAsync()
        {
            await Observable.Generate(new Random(), x => true,
                                      x => x, x => x.Next(250, 500))
                .SelectMany((x, idx) => Observable.FromAsync(async ct =>
                {
                    Console.WriteLine("start:  " + idx.ToString());
                    await Task.Delay(x, ct);
                    Console.WriteLine("finish: " + idx.ToString());
                    return idx;
                }))
                .Take(10)
                .LastOrDefaultAsync();
        }
    }
}

However, I've noticed that this seems to start all the async tasks concurrently rather than doing them one at a time, which causes memory usage of the app to balloon. The SelectMany appears to be acting no different than a Merge.

Here, I see output like this:

start:  0
start:  1
start:  2
...

I'd like to see:

start:  0
finish: 0
start:  1
finish: 1
start:  2
finish: 2
...

How can I achieve this?

like image 759
Cory Nelson Avatar asked Jan 12 '23 18:01

Cory Nelson


1 Answers

Change the SelectMany to a Select with a Concat:

    static async Task MainAsync()
    {
        await Observable.Generate(new Random(), x => true,
                                  x => x, x => x.Next(250, 500))
            .Take(10)
            .Select((x, idx) => Observable.FromAsync(async ct =>
            {
                Console.WriteLine("start:  " + idx.ToString());
                await Task.Delay(x, ct);
                Console.WriteLine("finish: " + idx.ToString());
                return idx;
            }))
            .Concat()
            .LastOrDefaultAsync();
    }

EDIT - I moved the Take(10) up the chain because the Generate won't block - so it stops this running away.

The Select projects each event into a stream representing an async task that will start on Subscription. Concat accepts a stream of streams and subscribes to each successive sub-stream when the previous has completed, concatenating all the streams into a single flat stream.

like image 56
James World Avatar answered Jan 19 '23 07:01

James World