Logo Questions Linux Laravel Mysql Ubuntu Git Menu

ReactiveExtensions Observable FromAsync calling twice Function

Ok, Trying to understand Rx, kinda of lost here.

FromAsyncPattern is now deprecated so I took the example from here (section Light up Task with Rx), and it works, I just made a few changes, not using await just wait the observable and subscribing.....

What I don't understand is Why is called Twice the function SumSquareRoots?

 var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))

            res.Subscribe(y => Console.WriteLine(y));


class Program
    static void Main(string[] args)

    static void Samples()
        var x = 100000000;

            var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))

            res.Subscribe(y => Console.WriteLine(y));

        catch (TimeoutException)
            Console.WriteLine("Timed out :-(");

    static Task<double> SumSquareRoots(long count, CancellationToken ct)
        return Task.Run(() =>
            var res = 0.0;
            Console.WriteLine("Why I'm called twice");
            for (long i = 0; i < count; i++)
                res += Math.Sqrt(i);

                if (i % 10000 == 0 && ct.IsCancellationRequested)
                    Console.WriteLine("Noticed cancellation!");

            return res;
like image 449
jjchiw Avatar asked Dec 15 '22 17:12


1 Answers

The reason that this is calling SumSquareRoots twice is because you're Subscribing twice:

// Subscribes to res
res.Subscribe(y => Console.WriteLine(y));

// Also Subscribes to res, since it *must* produce a result, even
// if that result is then discarded (i.e. Wait doesn't return IObservable)

Subscribe is the foreach of Rx - just like if you foreach an IEnumerable twice, you could end up doing 2x the work, multiple Subscribes means multiple the work. To undo this, you could use a blocking call that doesn't discard the result:


Or, you could use Publish to "freeze" the result and play it back to > 1 subscriber (kind of like how you'd use ToArray in LINQ):

res = res.Publish();

// Both subscriptions get the same result, SumSquareRoots is only called once

The general rule you can follow is, that any Rx method that doesn't return IObservable<T> or Task<T> will result in a Subscription(*)

* - Not technically correct. But your brain will feel better if you think of it this way.

like image 110
Ana Betts Avatar answered Apr 28 '23 10:04

Ana Betts