Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ReactiveExtensions Observable FromAsync calling twice Function

Ok, Trying to understand Rx, kinda of lost here.

FromAsyncPattern is now deprecated so I took the example from here (section Light up Task with Rx), and it works, I just made a few changes, not using await just wait the observable and subscribing.....

What I don't understand is Why is called Twice the function SumSquareRoots?

 var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
                                      .Timeout(TimeSpan.FromSeconds(5));

            res.Subscribe(y => Console.WriteLine(y));

            res.Wait();


class Program
{
    static void Main(string[] args)
    {
        Samples();
    }

    static void Samples()
    {
        var x = 100000000;

        try
        {
            var res = Observable.FromAsync(ct => SumSquareRoots(x, ct))
                                      .Timeout(TimeSpan.FromSeconds(5));

            res.Subscribe(y => Console.WriteLine(y));

            res.Wait();
        }
        catch (TimeoutException)
        {
            Console.WriteLine("Timed out :-(");
        }
    }

    static Task<double> SumSquareRoots(long count, CancellationToken ct)
    {
        return Task.Run(() =>
        {
            var res = 0.0;
            Console.WriteLine("Why I'm called twice");
            for (long i = 0; i < count; i++)
            {
                res += Math.Sqrt(i);

                if (i % 10000 == 0 && ct.IsCancellationRequested)
                {
                    Console.WriteLine("Noticed cancellation!");
                    ct.ThrowIfCancellationRequested();
                }
            }

            return res;
        });
    }
}
like image 449
jjchiw Avatar asked Dec 15 '22 17:12

jjchiw


1 Answers

The reason that this is calling SumSquareRoots twice is because you're Subscribing twice:

// Subscribes to res
res.Subscribe(y => Console.WriteLine(y));

// Also Subscribes to res, since it *must* produce a result, even
// if that result is then discarded (i.e. Wait doesn't return IObservable)
res.Wait();

Subscribe is the foreach of Rx - just like if you foreach an IEnumerable twice, you could end up doing 2x the work, multiple Subscribes means multiple the work. To undo this, you could use a blocking call that doesn't discard the result:

Console.WriteLine(res.First());

Or, you could use Publish to "freeze" the result and play it back to > 1 subscriber (kind of like how you'd use ToArray in LINQ):

res = res.Publish();
res.Connect();

// Both subscriptions get the same result, SumSquareRoots is only called once
res.Subscribe(Console.WriteLine);
res.Wait();

The general rule you can follow is, that any Rx method that doesn't return IObservable<T> or Task<T> will result in a Subscription(*)

* - Not technically correct. But your brain will feel better if you think of it this way.

like image 110
Ana Betts Avatar answered Apr 28 '23 10:04

Ana Betts