Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Throttling Events and Locking Methods

Let's pretend I have something like this:

<TextBox Text="{Binding Text, Mode=TwoWay}" />

And something like this:

public class MyViewModel : INotifyPropertyChanged
{
    public MyViewModel()
    {
        // run DoWork() when this.Text changes
        Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
            .Where(x => x.EventArgs.PropertyName.Equals("Text"))
            .Subscribe(async x => await DoWork());
    }

    private async Task DoWork()
    {
        await Task.Delay(this.Text.Length * 100);
    }

    public event PropertyChangedEventHandler PropertyChanged;

    private string _Text = "Hello World";
    public string Text
    {
        get { return _Text; }
        set
        {
            _Text = value;
            if (this.PropertyChanged != null)
                this.PropertyChanged(this, new PropertyChangedEventArgs("Text"));
        }
    }
}

In this scenario, the user could be typing very quickly. I need:

  1. DoWork() must not run while DoWork() is already running

  2. The user may type in spurts, some changes, pause, some changes

  3. DoWork() is NOT required for every change, only the last change

  4. There is no need for DoWork() to be called more frequently than 1 second

  5. DoWork() cannot wait until the last change, if the spurt is > 1 second

  6. DoWork() should not be called while the system is idle

  7. The duration of DoWork() varies based on the length of this.Text

The question isn't if Rx can do this. I know it can. What's the proper syntax?

like image 676
Jerry Nixon Avatar asked Feb 18 '14 16:02

Jerry Nixon


3 Answers

While I kind of agree with James World, I think you can Do Better, if we use just a bit of mutable state. What if DoWork looked like this:

AsyncSubject<Unit> doingWork;
public IObservable<Unit> DoWork()
{
    if (doingWork != null) return doingWork;

    doingWork = Observable.Start(() => {
        // XXX: Do work
        Thread.Sleep(1000);

        // We only kick off this 1sec timeout *after* we finish doing work
        Observable.Timer(TimeSpan.FromSeconds(1.0), DispatcherScheduler.Instance)
            .Subscribe(_ => doingWork = null);
    });

    return doingWork;
}

Now, DoWork debounces itself Automagically™, and we can get rid of this await-in-Subscribe silliness; we set the throttle to 250ms to be Quick-But-Not-Too-Quick.

This initially appears to violate requirement #5 above, but we've ensured that anyone calling DoWork too quickly just gets the previous run's results - the effect will be that DoWork will be called many times, but not necessarily do anything. This ensures though, that if we aren't doing work, we won't have a 1sec delay after the user stops typing, like Throttle(1.seconds) would

    Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
        .Where(x => x.EventArgs.PropertyName.Equals("Text"))
        .Throttle(TimeSpan.FromMilliseconds(250), DispatcherScheduler.Instance)
        .SelectMany(_ => DoWork())
        .Catch<Unit, Exception>(ex => {
            Console.WriteLine("Oh Crap, DoWork failed: {0}", ex);
            return Observable.Empty<Unit>();
        })
        .Subscribe(_ => Console.WriteLine("Did work"));
like image 176
Ana Betts Avatar answered Sep 17 '22 22:09

Ana Betts


I think a simpler and reusable way to solve your problem might actually be async/await-based rather than RX-based. Check out the single threaded EventThrottler class implementation I got as an answer to my 'Is there such a synchronization tool as “single-item-sized async task buffer”?' question. With that you can rewrite your DoWork() method as simply:

private void DoWork()
{
    EventThrottler.Default.Run(async () =>
    {
        await Task.Delay(1000);
        //do other stuff
    });
}

and call it every time your text changes. No RX required. Also, if you are already using WinRT XAML Toolkit - the class is in there.

Here's a copy of the throttler class code as a quick reference:

public class EventThrottler
{
    private Func<Task> next = null;
    private bool isRunning = false;

    public async void Run(Func<Task> action)
    {
        if (isRunning)
            next = action;
        else
        {
            isRunning = true;

            try
            {
                await action();

                while (next != null)
                {
                    var nextCopy = next;
                    next = null;
                    await nextCopy();
                }
            }
            finally
            {
                isRunning = false;
            }
        }
    }

    private static Lazy<EventThrottler> defaultInstance =
        new Lazy<EventThrottler>(() => new EventThrottler());
    public static EventThrottler Default
    {
        get { return defaultInstance.Value; }
    }
}
like image 40
Filip Skakun Avatar answered Sep 17 '22 22:09

Filip Skakun


You may be surprised how hard this is as a pure RX solution. It's subtly different to the similar (and typical Rx 101 example) of submitting a throttled search in response to textbox changes - in that case, it's ok to fire off concurrent searches, cancelling all but the latest one.

In this case, once DoWork() is off and running it can't be replaced or interrupted.

The problem is that Rx streams flow in one direction and can't "talk backwards" - so events queue up against slow consumers. To drop events due to slow consumers is quite hard in Rx.

It's much easier in a world where DoWork() can be cancelled and replaced when a new (probably throttled) event arrives.

First I present a pure Rx solution. Then at the end, a simpler approach where the slow consumer is dealt with by a dispatching mechanism outside of Rx.

For the pure approach, you'll need this helper extension method to drop events queued against a slow consumer which you can read about here:

public static IObservable<T> ObserveLatestOn<T>(
    this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;

        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool wasNotAlreadyActive;
            lock (gate)
            {
                wasNotAlreadyActive = !active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (wasNotAlreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}

With this available you can then do something like:

// run DoWork() when this.Text changes
Observable.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
          .Where(x => x.EventArgs.PropertyName.Equals("Text"))
          .Sample(TimeSpan.FromSeconds(1)) // get the latest event in each second
          .ObservableLatestOn(Scheduler.Default) // drop all but the latest event
          .Subscribe(x => DoWork().Wait()); // block to avoid overlap

Remarks

To be honest, you are probably better off avoiding the pure Rx solution here, and instead DON'T call DoWork() directly from a subscriber. I would wrap it with an intermediate dispatching mechanism called from the Subscribe method that handles not calling it if it's already running - the code would be way simpler to maintain.

EDIT:

After thinking on this for a few days, I didn't do any better than some of the other answers here - I'll leave the above for interest, but I think I like Filip Skakun approach the best.

like image 40
James World Avatar answered Sep 21 '22 22:09

James World