Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to generate an observable from a starting observable, when I can only generate other observables?

I want to generate an observable where each value of the observable is dependent on the one before it, starting from a single value. If I have a simple transformation between values like Func<int, int>, it is easy to do with Observable.Generate like so:

Func<int, IObservable<int>> mkInts = init =>
    Observable.Generate(
        init,         // start value
        _ => true,    // continue ?
        i => i + 1,   // transformation function
        i => i);      // result selector

using (mkInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

This will happily write numbers on my screen until I press enter. However, my transformation function does some network IO, so the type is Func<int, IObservable<int>>, so I cannot use that approach. Instead, I have tried this:

// simulate my transformation function
Func<int, IObservable<int>> mkInt = ts =>
    Observable.Return(ts)
              .Delay(TimeSpan.FromMilliseconds(10));

// pre-assign my generator function, since the function calls itself recursively
Func<int, IObservable<int>> mkInts = null;

// my generator function
mkInts = init =>
{
    var ints = mkInt(init); 

    // here is where I depend on the previous value.
    var nextInts = ints.SelectMany(i => mkInts(i + 1)); 
    return ints.Concat(nextInts);
};

using (mkInts(1).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

But this will stackoverflow after printing about 5000 numbers. How can I solve this?

like image 621
Boris Avatar asked Feb 24 '23 01:02

Boris


1 Answers

I think I've got a nice clean solution for you.

First-up, go back to using a Func<int, int> - it can easily be turned into a Func<int, IObservable<int>> using Observable.FromAsyncPattern.

I used this for testing:

Func<int, int> mkInt = ts =>
{
    Thread.Sleep(100);
    return ts + 1;
};

Now here's the money maker:

Func<int, Func<int, int>, IObservable<int>> mkInts = (i0, fn) =>
    Observable.Create<int>(o =>
    {
        var ofn = Observable
            .FromAsyncPattern<int, int>(
                fn.BeginInvoke,
                fn.EndInvoke);

        var s = new Subject<int>();

        var q = s.Select(x => ofn(x)).Switch();

        var r = new CompositeDisposable(new IDisposable[]
        {
            q.Subscribe(s),
            s.Subscribe(o),
        });

        s.OnNext(i0);

        return r;
    });

The iterating function is turned into an asynchronous observable.

The q variable feeds the values from the subject into the observable iterating function and selects the calculated observable. The Switch method flattens out the result and ensures that each call to the observable iterating function is properly cleaned up.

Also, the use of a CompositeDisposable allows the two subscriptions to be disposed of as one. Very neat!

It's easily used like this:

using (mkInts(7, mkInt).Subscribe(Console.WriteLine))
{
    Console.ReadLine();
}

Now you have a fully parametrized version of your generator function. Nice, huh?

like image 82
Enigmativity Avatar answered Apr 30 '23 11:04

Enigmativity