Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Event throttling / queuing - Reactive Extensions?

I am looking to implement some throttling behavior into one of my viewmodels. It's a silverlight application, however I don't think that's particularly important.

Consider a class with three properties:

  • Property1
  • Property2
  • Property3

Whenever one of these properties is updated, a refresh is neccessary.

private void Refresh()
{
    //Call out to the server, do something when it comes back
}

My goals are as follows:

  • If a Refresh is in progress, we should ideally cancel the call to the server, and issue a new request
  • If a property is changed, we should leave some small window of time (perhaps 0.1 seconds) where we wait for additional changes. This way, if multiple properties are changed rapidly (for example, programmatically) we don't spam the server with requests. It's OK for that 0.1 second window to reset on each change, but not required.

If it matters, I am using a ChannelFactory implementation for the server call.

What kind of patterns can I use to accomplish this? Is this something reactive extensions can help me with?

Edit:

Marking Paul's answer as correct. While ReactiveUI doesn't currently work against silverlight5, it clearly outlines the approach / composition steps to solving the problem using Rx.

like image 988
Shaun Rowan Avatar asked Jun 15 '12 16:06

Shaun Rowan


1 Answers

Here's how you would do this with ReactiveUI:

IObservable<TheData> FetchNewData() 
{
    // TODO: Implement me
}

this.WhenAny(x => x.Property1, x => x.Property2, x => x.Property3, (x,y,z) => Unit.Default)
    .Throttle(TimeSpan.FromMilliseconds(200), RxApp.DeferredScheduler)
    .Select(x => FetchNewData())
    .Switch()    // We only care about the req corresp. to latest values of Prop1-3
    .ToProperty(this, x => x.Data);

Update: Here's how to guarantee that there is only one running at a time, with the caveat that you may get out-of-order results.

this.WhenAny(x => x.Property1, x => x.Property2, x => x.Property3, (x,y,z) => Unit.Default)
    .Throttle(TimeSpan.FromMilliseconds(200), RxApp.DeferredScheduler)
    .Select(_ => Observable.Defer(() => FetchNewData()))
    .Merge(1)
    .ToProperty(this, x => x.Data);

The behavior you describe, would actually be perhaps not desireable, since if the properties keep changing, you'll end up with a queue of old requests to issue - you could optimize this if you made something like a "BufferingSwitch()" operator that didn't return its results until it was sure there were no changes - that'd actually be cool to write.

Moral of the story, Async Is Complicated™ :)

like image 114
Ana Betts Avatar answered Oct 30 '22 14:10

Ana Betts