Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Poll a webservice using Reactive Extensions and bind the last x results

I'm trying to use Reactive Extensions (Rx) for a task where it seems to be a good fit, polling at a specific interval a web service and display its last x results.

I have a web service that sends me the status of an instrument I want to monitor. I would like to poll this instrument at a specific rate and display in a list the last 20 status that have been polled.

So my list would be like a "moving window" of the service result.

I'm developing a WPF app with Caliburn.Micro, but I don't think this is very relevant.

What I managed to get until now is the following (just a sample app that I hacked quickly, I'm not going to do this in the ShellViewModel in the real app):

public class ShellViewModel : Caliburn.Micro.PropertyChangedBase, IShell
{
    private ObservableCollection<string> times;
    private string currentTime;

    public ShellViewModel()
    {
        times = new ObservableCollection<string>();

        Observable
            .Interval(TimeSpan.FromSeconds(1))
            .SelectMany(x => this.GetCurrentDate().ToObservable())
            .ObserveOnDispatcher()
            .Subscribe(x =>
            {
                this.CurrentTime = x;
                this.times.Add(x);
            });
    }

    public IEnumerable<string> Times
    {
        get
        {
            return this.times;
        }
    }

    public string CurrentTime
    {
        get
        {
            return this.currentTime;
        }
        set
        {
            this.currentTime = value;
            this.NotifyOfPropertyChange(() => this.CurrentTime);
        }
    }

    private async Task<string> GetCurrentDate()
    {
        var client = new RestClient("http://www.timeapi.org");
        var request = new RestRequest("/utc/now.json");

        var response = await client.ExecuteGetTaskAsync(request);

        return response.Content;
    }
}

In the view I have just a label bound to the CurrentTime property and a list bound to the Times property.

The issue I have is:

  • It's not limited to the 20 items in the list as I always add items to the ObservableCollection but I can't find a better way to databind
  • The Interval doesn't work as I'd like. If the querying takes more than 1 second to run, two queries will be run in parallel, which I'd like not to happen. My goal would be that the query repeats indefinitely but at a pace of no more than 1 query every seconds. If a query makes more than 1 second to end, it should wait for it to have finish and directly trigger the new query.

Second edit:

Previous edit below was me being stupid and very confused, it triggers events continuously because Interval is something continuous that never ends. Brandon's solution is correct and works as expected.

Edit:

Based on Brandon's example, I tried to do the following code in LinqPad:

Observable
    .Merge(Observable.Interval(TimeSpan.FromSeconds(2)), Observable.Interval(TimeSpan.FromSeconds(10)))
    .Repeat()
    .Scan(new List<double>(), (list, item) => { list.Add(item); return list; })
    .Subscribe(x => Console.Out.WriteLine(x))

And I can see that the write to the console occurs every 2 seconds, and not every 10. So the Repeat doesn't wait for both Observable to be finished before repeating.

like image 423
Gimly Avatar asked Jul 03 '14 12:07

Gimly


1 Answers

Try this:

// timer that completes after 1 second
var intervalTimer = Observable
    .Empty<string>()
    .Delay(TimeSpan.FromSeconds(1));

// queries one time whenever subscribed
var query = Observable.FromAsync(GetCurrentDate);

// query + interval timer which completes
// only after both the query and the timer
// have expired

var intervalQuery = Observable.Merge(query, intervalTimer);

// Re-issue the query whenever intervalQuery completes
var queryLoop = intervalQuery.Repeat();

// Keep the 20 most recent results
// Note.  Use an immutable list for this
// https://www.nuget.org/packages/microsoft.bcl.immutable
// otherwise you will have problems with
// the list changing while an observer
// is still observing it.
var recentResults = queryLoop.Scan(
    ImmutableList.Create<string>(), // starts off empty
    (acc, item) =>
    {
        acc = acc.Add(item);
        if (acc.Count > 20)
        {
            acc = acc.RemoveAt(0);
        }

        return acc;
    });

// store the results
recentResults
    .ObserveOnDispatcher()
    .Subscribe(items =>
    {
        this.CurrentTime = items[0];
        this.RecentItems = items;
    });
like image 183
Brandon Avatar answered Oct 11 '22 11:10

Brandon