Ok, Trying to understand Rx, kinda of lost here.
FromAsyncPattern is now deprecated so I took the example from here (section Light up Task with Rx), and it works, I just made a few changes, not using await just wait the observable and subscribing.....
What I don't understand is Why is called Twice the function SumSquareRoots?
 var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
                                      .Timeout(TimeSpan.FromSeconds(5));
            res.Subscribe(y => Console.WriteLine(y));
            res.Wait();
class Program
{
    static void Main(string[] args)
    {
        Samples();
    }
    static void Samples()
    {
        var x = 100000000;
        try
        {
            var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
                                      .Timeout(TimeSpan.FromSeconds(5));
            res.Subscribe(y => Console.WriteLine(y));
            res.Wait();
        }
        catch (TimeoutException)
        {
            Console.WriteLine("Timed out :-(");
        }
    }
    static Task<double> SumSquareRoots(long count, CancellationToken ct)
    {
        return Task.Run(() =>
        {
            var res = 0.0;
            Console.WriteLine("Why I'm called twice");
            for (long i = 0; i < count; i++)
            {
                res += Math.Sqrt(i);
                if (i % 10000 == 0 && ct.IsCancellationRequested)
                {
                    Console.WriteLine("Noticed cancellation!");
                    ct.ThrowIfCancellationRequested();
                }
            }
            return res;
        });
    }
}
                The reason that this is calling SumSquareRoots twice is because you're Subscribing twice:
// Subscribes to res
res.Subscribe(y => Console.WriteLine(y));
// Also Subscribes to res, since it *must* produce a result, even
// if that result is then discarded (i.e. Wait doesn't return IObservable)
res.Wait();
Subscribe is the foreach of Rx - just like if you foreach an IEnumerable twice, you could end up doing 2x the work, multiple Subscribes means multiple the work. To undo this, you could use a blocking call that doesn't discard the result:
Console.WriteLine(res.First());
Or, you could use Publish to "freeze" the result and play it back to > 1 subscriber (kind of like how you'd use ToArray in LINQ):
res = res.Publish();
res.Connect();
// Both subscriptions get the same result, SumSquareRoots is only called once
res.Subscribe(Console.WriteLine);
res.Wait();
The general rule you can follow is, that any Rx method that doesn't return IObservable<T> or Task<T> will result in a Subscription(*)
* - Not technically correct. But your brain will feel better if you think of it this way.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With