Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I use Reactive Extensions to throttle Events using a max window size?

Scenario:

I am building a UI application that gets notifcations from a backend service every few milliseconds. Once I get a new notification i want to update the UI as soon as possible.

As I can get lots of notifications within a short amount of time, and as I always only care about the latest event, I use the Throttle() method of the Reactive Extensions framework. This allows me to ignore notification events that are immediately followed by a new notification and so my UI stays responsive.

Problem:

Say I throttle the event stream of notification events to 50ms and the backend sends a notification every 10ms, the Thottle() method will never return an event as it keeps resetting its Sliding Window again and again. Here i need some additional behaviour to specify something like a timeout, so that i can retrieve atleast one event per second or so in case of such a high throughput of events. How can i do this with Reactive Extensions?

like image 936
user3002200 Avatar asked Nov 17 '13 18:11

user3002200


2 Answers

As James stated, Observable.Sample will give you the latest value yielded. However, it will do so on a timer, and not in accordance to when the first event in the throttle occurred. More importantly, however, is that if your sample time is high (say ten seconds), and your event fires right after a sample is taken, you won't get that new event for almost ten seconds.

If you need something a little tighter, you'll need to implement your own function. I've taken the liberty of doing so. This code could definitely use some clean up, but I believe it does what you've asked for.

public static class ObservableEx
{
    public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime)
    {
        return source.ThrottleMax(dueTime, maxTime, Scheduler.Default);
    }

    public static IObservable<T> ThrottleMax<T>(this IObservable<T> source, TimeSpan dueTime, TimeSpan maxTime, IScheduler scheduler)
    {
        return Observable.Create<T>(o =>
        {
            var hasValue = false;
            T value = default(T);

            var maxTimeDisposable = new SerialDisposable();
            var dueTimeDisposable = new SerialDisposable();

            Action action = () =>
            {
                if (hasValue)
                {
                    maxTimeDisposable.Disposable = Disposable.Empty;
                    dueTimeDisposable.Disposable = Disposable.Empty;
                    o.OnNext(value);
                    hasValue = false;
                }
            };

            return source.Subscribe(
                x =>
                {
                    if (!hasValue)
                    {
                        maxTimeDisposable.Disposable = scheduler.Schedule(maxTime, action);
                    }

                    hasValue = true;
                    value = x;
                    dueTimeDisposable.Disposable = scheduler.Schedule(dueTime, action);
                },
                o.OnError,
                o.OnCompleted
            );
        });
    }
}

And a few tests...

[TestClass]
public class ThrottleMaxTests : ReactiveTest
{
    [TestMethod]
    public void CanThrottle()
    {

        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(200, 1)
            );
    }

    [TestMethod]
    public void CanThrottleWithMaximumInterval()
    {

        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(175, 2),
            OnNext(250, 3),
            OnNext(325, 4),
            OnNext(400, 5)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(350, 4),
            OnNext(500, 5)
            );
    }

    [TestMethod]
    public void CanThrottleWithoutMaximumIntervalInterferance()
    {
        var scheduler = new TestScheduler();
        var results = scheduler.CreateObserver<int>();

        var source = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(325, 2)
            );

        var dueTime = TimeSpan.FromTicks(100);
        var maxTime = TimeSpan.FromTicks(250);

        source.ThrottleMax(dueTime, maxTime, scheduler)
            .Subscribe(results);

        scheduler.AdvanceTo(1000);

        results.Messages.AssertEqual(
            OnNext(200, 1),
            OnNext(425, 2)
            );
    }
}
like image 185
cwharris Avatar answered Nov 16 '22 19:11

cwharris


Don't use Observable.Throttle, use Observable.Sample like this, where the TimeSpan gives the desired minimum interval between updates:

source.Sample(TimeSpan.FromMilliseconds(50))
like image 3
James World Avatar answered Nov 16 '22 20:11

James World