Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

IObservable to produce results in infinite loop [duplicate]

This is a code that I've developed so far:

var observable = Observable.Create<string>(async observer =>
{
    var wc = new WebClient { UseDefaultCredentials = true };
    observer.OnNext(await wc.DownloadStringTaskAsync("http://ya.ru"));
});

observable.Subscribe(
    res => Debug.WriteLine("got result: {0}", res), 
    exc => Debug.WriteLine("exception: {0}", exc.Message)
); 

This correctly fetches website's data and triggers my callback once. What I want is to have an infinite loop that acts like this: await result -> call OnNext -> wait n seconds -> repeat an operation.

Creating an Observable.Interval and SelectMany it into my Observable won't quite do because this will be querying a website for a fixed periods of time. I want the next call to be triggered only after the previous succeeds or fails. What's the most elegant and concise way to achieve this?

like image 973
src091 Avatar asked Mar 22 '23 05:03

src091


1 Answers

Without changing too much of your code, you could effectively concat a delay after the yield, and then repeat the whole thing indefinitely.

var observable = Observable.Create<string>(async observer =>
{
    var wc = new WebClient { UseDefaultCredentials = true };
    observer.OnNext(await wc.DownloadStringTaskAsync("http://ya.ru"));
});

observable
    .Concat(Observable.Empty<string>().Delay(TimeSpan.FromSeconds(1)))
    .Repeat()
    .Subscribe(
      res => Debug.WriteLine("got result: {0}", res), 
      exc => Debug.WriteLine("exception: {0}", exc.Message)
    ); 

However, there's better ways of doing this, because in the previous instance you're creating a new WebClient every second. Instead, you could do something like this...

using System.Reactive.Threading.Tasks;

var observable = Observable.Using(() => new WebClient(), (client) =>
    client.DownloadStringTaskAsync("http://ya.ru")
        .ToObservable()
        .Concat(Observable.Empty<string>().Delay(TimeSpan.FromSeconds(1)))
        .Repeat()
        );

And if you wanted to repeat on errors, you could add Retry...

var observable = Observable.Using(() => new WebClient(), (client) =>
    client.DownloadStringTaskAsync("http://ya.ru")
        .ToObservable()
        .Retry(3)
        .Concat(Observable.Empty<string>().Delay(TimeSpan.FromSeconds(1)))
        .Repeat()
        );
like image 163
cwharris Avatar answered Mar 23 '23 23:03

cwharris