I'm trying to use Reactive Extensions (Rx) for a task where it seems to be a good fit, polling at a specific interval a web service and display its last x results.
I have a web service that sends me the status of an instrument I want to monitor. I would like to poll this instrument at a specific rate and display in a list the last 20 status that have been polled.
So my list would be like a "moving window" of the service result.
I'm developing a WPF app with Caliburn.Micro, but I don't think this is very relevant.
What I managed to get until now is the following (just a sample app that I hacked quickly, I'm not going to do this in the ShellViewModel in the real app):
public class ShellViewModel : Caliburn.Micro.PropertyChangedBase, IShell
{
private ObservableCollection<string> times;
private string currentTime;
public ShellViewModel()
{
times = new ObservableCollection<string>();
Observable
.Interval(TimeSpan.FromSeconds(1))
.SelectMany(x => this.GetCurrentDate().ToObservable())
.ObserveOnDispatcher()
.Subscribe(x =>
{
this.CurrentTime = x;
this.times.Add(x);
});
}
public IEnumerable<string> Times
{
get
{
return this.times;
}
}
public string CurrentTime
{
get
{
return this.currentTime;
}
set
{
this.currentTime = value;
this.NotifyOfPropertyChange(() => this.CurrentTime);
}
}
private async Task<string> GetCurrentDate()
{
var client = new RestClient("http://www.timeapi.org");
var request = new RestRequest("/utc/now.json");
var response = await client.ExecuteGetTaskAsync(request);
return response.Content;
}
}
In the view I have just a label bound to the CurrentTime
property and a list bound to the Times
property.
The issue I have is:
ObservableCollection
but I can't find a better way to databindSecond edit:
Previous edit below was me being stupid and very confused, it triggers events continuously because Interval is something continuous that never ends. Brandon's solution is correct and works as expected.
Edit:
Based on Brandon's example, I tried to do the following code in LinqPad:
Observable
.Merge(Observable.Interval(TimeSpan.FromSeconds(2)), Observable.Interval(TimeSpan.FromSeconds(10)))
.Repeat()
.Scan(new List<double>(), (list, item) => { list.Add(item); return list; })
.Subscribe(x => Console.Out.WriteLine(x))
And I can see that the write to the console occurs every 2 seconds, and not every 10. So the Repeat doesn't wait for both Observable to be finished before repeating.
Try this:
// timer that completes after 1 second
var intervalTimer = Observable
.Empty<string>()
.Delay(TimeSpan.FromSeconds(1));
// queries one time whenever subscribed
var query = Observable.FromAsync(GetCurrentDate);
// query + interval timer which completes
// only after both the query and the timer
// have expired
var intervalQuery = Observable.Merge(query, intervalTimer);
// Re-issue the query whenever intervalQuery completes
var queryLoop = intervalQuery.Repeat();
// Keep the 20 most recent results
// Note. Use an immutable list for this
// https://www.nuget.org/packages/microsoft.bcl.immutable
// otherwise you will have problems with
// the list changing while an observer
// is still observing it.
var recentResults = queryLoop.Scan(
ImmutableList.Create<string>(), // starts off empty
(acc, item) =>
{
acc = acc.Add(item);
if (acc.Count > 20)
{
acc = acc.RemoveAt(0);
}
return acc;
});
// store the results
recentResults
.ObserveOnDispatcher()
.Subscribe(items =>
{
this.CurrentTime = items[0];
this.RecentItems = items;
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With