Let's pretend I have something like this:
<TextBox Text="{Binding Text, Mode=TwoWay}" />
And something like this:
public class MyViewModel : INotifyPropertyChanged
{
public MyViewModel()
{
// run DoWork() when this.Text changes
Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
.Where(x => x.EventArgs.PropertyName.Equals("Text"))
.Subscribe(async x => await DoWork());
}
private async Task DoWork()
{
await Task.Delay(this.Text.Length * 100);
}
public event PropertyChangedEventHandler PropertyChanged;
private string _Text = "Hello World";
public string Text
{
get { return _Text; }
set
{
_Text = value;
if (this.PropertyChanged != null)
this.PropertyChanged(this, new PropertyChangedEventArgs("Text"));
}
}
}
In this scenario, the user could be typing very quickly. I need:
DoWork() must not run while DoWork() is already running
The user may type in spurts, some changes, pause, some changes
DoWork() is NOT required for every change, only the last change
There is no need for DoWork() to be called more frequently than 1 second
DoWork() cannot wait until the last change, if the spurt is > 1 second
DoWork() should not be called while the system is idle
The duration of DoWork() varies based on the length of this.Text
The question isn't if Rx can do this. I know it can. What's the proper syntax?
While I kind of agree with James World, I think you can Do Better, if we use just a bit of mutable state. What if DoWork looked like this:
AsyncSubject<Unit> doingWork;
public IObservable<Unit> DoWork()
{
if (doingWork != null) return doingWork;
doingWork = Observable.Start(() => {
// XXX: Do work
Thread.Sleep(1000);
// We only kick off this 1sec timeout *after* we finish doing work
Observable.Timer(TimeSpan.FromSeconds(1.0), DispatcherScheduler.Instance)
.Subscribe(_ => doingWork = null);
});
return doingWork;
}
Now, DoWork debounces itself Automagically™, and we can get rid of this await-in-Subscribe silliness; we set the throttle to 250ms to be Quick-But-Not-Too-Quick.
This initially appears to violate requirement #5 above, but we've ensured that anyone calling DoWork too quickly just gets the previous run's results - the effect will be that DoWork will be called many times, but not necessarily do anything. This ensures though, that if we aren't doing work, we won't have a 1sec delay after the user stops typing, like Throttle(1.seconds)
would
Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
.Where(x => x.EventArgs.PropertyName.Equals("Text"))
.Throttle(TimeSpan.FromMilliseconds(250), DispatcherScheduler.Instance)
.SelectMany(_ => DoWork())
.Catch<Unit, Exception>(ex => {
Console.WriteLine("Oh Crap, DoWork failed: {0}", ex);
return Observable.Empty<Unit>();
})
.Subscribe(_ => Console.WriteLine("Did work"));
I think a simpler and reusable way to solve your problem might actually be async/await-based rather than RX-based. Check out the single threaded EventThrottler
class implementation I got as an answer to my 'Is there such a synchronization tool as “single-item-sized async task buffer”?' question. With that you can rewrite your DoWork()
method as simply:
private void DoWork()
{
EventThrottler.Default.Run(async () =>
{
await Task.Delay(1000);
//do other stuff
});
}
and call it every time your text changes. No RX required. Also, if you are already using WinRT XAML Toolkit - the class is in there.
Here's a copy of the throttler class code as a quick reference:
public class EventThrottler
{
private Func<Task> next = null;
private bool isRunning = false;
public async void Run(Func<Task> action)
{
if (isRunning)
next = action;
else
{
isRunning = true;
try
{
await action();
while (next != null)
{
var nextCopy = next;
next = null;
await nextCopy();
}
}
finally
{
isRunning = false;
}
}
}
private static Lazy<EventThrottler> defaultInstance =
new Lazy<EventThrottler>(() => new EventThrottler());
public static EventThrottler Default
{
get { return defaultInstance.Value; }
}
}
You may be surprised how hard this is as a pure RX solution. It's subtly different to the similar (and typical Rx 101 example) of submitting a throttled search in response to textbox changes - in that case, it's ok to fire off concurrent searches, cancelling all but the latest one.
In this case, once DoWork() is off and running it can't be replaced or interrupted.
The problem is that Rx streams flow in one direction and can't "talk backwards" - so events queue up against slow consumers. To drop events due to slow consumers is quite hard in Rx.
It's much easier in a world where DoWork()
can be cancelled and replaced when a new (probably throttled) event arrives.
First I present a pure Rx solution. Then at the end, a simpler approach where the slow consumer is dealt with by a dispatching mechanism outside of Rx.
For the pure approach, you'll need this helper extension method to drop events queued against a slow consumer which you can read about here:
public static IObservable<T> ObserveLatestOn<T>(
this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
{
Notification<T> outsideNotification = null;
var gate = new object();
bool active = false;
var cancelable = new MultipleAssignmentDisposable();
var disposable = source.Materialize().Subscribe(thisNotification =>
{
bool wasNotAlreadyActive;
lock (gate)
{
wasNotAlreadyActive = !active;
active = true;
outsideNotification = thisNotification;
}
if (wasNotAlreadyActive)
{
cancelable.Disposable = scheduler.Schedule(self =>
{
Notification<T> localNotification = null;
lock (gate)
{
localNotification = outsideNotification;
outsideNotification = null;
}
localNotification.Accept(observer);
bool hasPendingNotification = false;
lock (gate)
{
hasPendingNotification = active = (outsideNotification != null);
}
if (hasPendingNotification)
{
self();
}
});
}
});
return new CompositeDisposable(disposable, cancelable);
});
}
With this available you can then do something like:
// run DoWork() when this.Text changes
Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
.Where(x => x.EventArgs.PropertyName.Equals("Text"))
.Sample(TimeSpan.FromSeconds(1)) // get the latest event in each second
.ObservableLatestOn(Scheduler.Default) // drop all but the latest event
.Subscribe(x => DoWork().Wait()); // block to avoid overlap
To be honest, you are probably better off avoiding the pure Rx solution here, and instead DON'T call DoWork()
directly from a subscriber. I would wrap it with an intermediate dispatching mechanism called from the Subscribe
method that handles not calling it if it's already running - the code would be way simpler to maintain.
EDIT:
After thinking on this for a few days, I didn't do any better than some of the other answers here - I'll leave the above for interest, but I think I like Filip Skakun approach the best.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With