I want to generate an observable where each value of the observable is dependent on the one before it, starting from a single value. If I have a simple transformation between values like Func<int, int>
, it is easy to do with Observable.Generate
like so:
Func<int, IObservable<int>> mkInts = init =>
Observable.Generate(
init, // start value
_ => true, // continue ?
i => i + 1, // transformation function
i => i); // result selector
using (mkInts(1).Subscribe(Console.WriteLine))
{
Console.ReadLine();
}
This will happily write numbers on my screen until I press enter. However, my transformation function does some network IO, so the type is Func<int, IObservable<int>>
, so I cannot use that approach. Instead, I have tried this:
// simulate my transformation function
Func<int, IObservable<int>> mkInt = ts =>
Observable.Return(ts)
.Delay(TimeSpan.FromMilliseconds(10));
// pre-assign my generator function, since the function calls itself recursively
Func<int, IObservable<int>> mkInts = null;
// my generator function
mkInts = init =>
{
var ints = mkInt(init);
// here is where I depend on the previous value.
var nextInts = ints.SelectMany(i => mkInts(i + 1));
return ints.Concat(nextInts);
};
using (mkInts(1).Subscribe(Console.WriteLine))
{
Console.ReadLine();
}
But this will stackoverflow after printing about 5000 numbers. How can I solve this?
I think I've got a nice clean solution for you.
First-up, go back to using a Func<int, int>
- it can easily be turned into a Func<int, IObservable<int>>
using Observable.FromAsyncPattern
.
I used this for testing:
Func<int, int> mkInt = ts =>
{
Thread.Sleep(100);
return ts + 1;
};
Now here's the money maker:
Func<int, Func<int, int>, IObservable<int>> mkInts = (i0, fn) =>
Observable.Create<int>(o =>
{
var ofn = Observable
.FromAsyncPattern<int, int>(
fn.BeginInvoke,
fn.EndInvoke);
var s = new Subject<int>();
var q = s.Select(x => ofn(x)).Switch();
var r = new CompositeDisposable(new IDisposable[]
{
q.Subscribe(s),
s.Subscribe(o),
});
s.OnNext(i0);
return r;
});
The iterating function is turned into an asynchronous observable.
The q
variable feeds the values from the subject into the observable iterating function and selects the calculated observable. The Switch
method flattens out the result and ensures that each call to the observable iterating function is properly cleaned up.
Also, the use of a CompositeDisposable
allows the two subscriptions to be disposed of as one. Very neat!
It's easily used like this:
using (mkInts(7, mkInt).Subscribe(Console.WriteLine))
{
Console.ReadLine();
}
Now you have a fully parametrized version of your generator function. Nice, huh?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With