Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Throttle an IObservable based on value

I have an IObservable<String>.

I am trying to detect (and handle) the case where the same string is notified in short succession.

I want a filter/stream/observable such that if the same string is notified within 250ms of each other, it only notifies once.

Not really sure where to start.

like image 455
Cheetah Avatar asked Dec 04 '13 21:12

Cheetah


2 Answers

Here is a fairly compact solution. Your post a little ambigious about whether the duration would reset as soon as a distinct value arrives or not - so I have provided two solutions to both interpretations.

Variation 1 - Distinct "in between" values don't reset timer

This is when you care strictly about the duration for suppression and don't care if there are any "in between" values (as per McGarnagle's solution) - i.e. if you get "a", "b" ,"a" quickly, you still want to suppress the second "a". Fortunately, this is very easy with a GroupByUntil which groups for the duration and emits the first element of each group:

    public static IObservable<T> DistinctUntilChanged<T>(
        this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
    {
        if (scheduler == null) scheduler = Scheduler.Default;

        return source.GroupByUntil(k => k,
                                   _ => Observable.Timer(duration, scheduler))
                     .SelectMany(y => y.FirstAsync());
    }

If you are wondering about the method name - I came up with Variation 2b first; I left the name above as it is so the unit tests still pass. It probably needs a better name like SuppressDuplicatesWithinWindow or similar...

Variation 2a - "In between" distinct values DO reset timer

This is slightly more complex - now any event in a different group will end a given group. I use the Publish().RefCount() combo to prevent more than one subscription to the source and have to be very careful with nulls:

public static IObservable<T> DistinctUntilChanged<T>(
    this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
{
    if (scheduler == null) scheduler = Scheduler.Default;

    var sourcePub = source.Publish().RefCount();

    return sourcePub.GroupByUntil(
        k => k,
        x => Observable.Timer(duration, scheduler)
                       .TakeUntil(
                           sourcePub.Where(i => ReferenceEquals(null, i)
                                                ? !ReferenceEquals(null, x.Key)
                                                : !i.Equals(x.Key))))
        .SelectMany(y => y.FirstAsync());
}

Variation 2b

This is the original approach I tried, I've added it in as it's not all that bad now since my refinements to 2a have it more complex:

It is a variation of Observable.DistinctUntilChanged that accepts a duration. Given an event, successive duplicate events within that duration are suppressed. If a different event arrives, or an event arrives outside that duration, it is emitted and the suppression timer is reset.

It works by using the overload of DistinctUntilChanged that accepts an IEqualityComparer. The comparer considers events with a TimeStamp applied to be equal if the values match and the timestamps are within the specified duration.

public static partial class ObservableExtensions
{
    public static IObservable<T> DistinctUntilChanged<T>(
        this IObservable<T> source, TimeSpan duration, IScheduler scheduler)
    {
        if (scheduler == null) scheduler = Scheduler.Default;

        return source.Timestamp(scheduler)
                     .DistinctUntilChanged(new Comparer<T>(duration))
                     .Select(ts => ts.Value);
    }

    private class Comparer<T> : IEqualityComparer<Timestamped<T>>
    {
        private readonly TimeSpan _duration;

        public Comparer(TimeSpan duration)
        {
            _duration = duration;
        }

        public bool Equals(Timestamped<T> x, Timestamped<T> y)
        {
            if (y.Timestamp - x.Timestamp > _duration) return false;

            return ReferenceEquals(x.Value, y.Value)
                   && !ReferenceEquals(null,x.Value)
                   && x.Value.Equals(y.Value);
        }

        public int GetHashCode(Timestamped<T> obj)
        {
            if (ReferenceEquals(null,obj.Value)) return obj.Timestamp.GetHashCode();
            return obj.Value.GetHashCode() ^ obj.Timestamp.GetHashCode();
        }
    }
}

Here are the unit tests I used (include nuget packages rx-testing and nunit):

public class TestDistinct : ReactiveTest
{
    [Test]
    public void DuplicateWithinDurationIsSupressed()
    {
        var scheduler = new TestScheduler();
        var source =scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"));
    }

    [Test]
    public void NonDuplicationWithinDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "b"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100,"a"),
            OnNext(200,"b"));
    }

    [Test]
    public void DuplicateAfterDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, "a"));
    }

    [Test]
    public void NonDuplicateAfterDurationIsNotSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, "b"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, "b"));
    }

    [Test]
    public void TestWithSeveralValues()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));
    }

    [Test]
    public void CanHandleNulls()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(600, (string)null),
            OnNext(700, (string)null));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(600, (string)null));
    }

    [Test]
    public void TwoDuplicatesWithinDurationAreSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(150, "a"),
            OnNext(200, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"));
    }

    [Test]
    public void TwoNullDuplicatesWithinDurationAreSupressed()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, (string)null),
            OnNext(150, (string)null),
            OnNext(200, (string)null));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, (string)null));
    }
}

Finally for completeness - Variation 1 would pass the following variant of the TestWithSeveralValues test:

    [Test]
    public void TestWithSeveralValuesVariation1()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateColdObservable(
            OnNext(100, "a"),
            OnNext(200, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(450, "b"),
            OnNext(900, "a"));

        var duration = TimeSpan.FromTicks(250);

        var results = scheduler.CreateObserver<string>();

        source.DistinctUntilChanged(duration, scheduler).Subscribe(results);

        scheduler.AdvanceBy(1000);

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(300, "b"),
            OnNext(350, "c"),
            OnNext(900, "a"));
    }

And the null test would change to have at the end:

        results.Messages.AssertEqual(
            OnNext(100, "a"),
            OnNext(400, (string)null),
            OnNext(500, "b"),
            OnNext(700, (string)null)); /* This line changes */
like image 114
James World Avatar answered Nov 03 '22 05:11

James World


You're looking for Observable.Throttle

Ignores the values from an observable sequence which are followed by another value before due time with the specified source and dueTime.

Edit

Ok, so the above only works to throttle all elements in the sequence, not by key as per the OP. I thought this would be an easy next step, but maybe not so much? (F# has a split function that would be helpful, but there is apparently no C# equivalent.)

So, here is an attempt at implementing Split:

public static class Extension
{
    public static IDisposable SplitSubscribe<T, TKey>(
        this IObservable<T> source, 
        Func<T, TKey> keySelector, 
        Action<IObservable<TKey>> subscribe)
    {
        // maintain a list of Observables, one for each key (TKey)
        var observables = new ConcurrentDictionary<TKey, Subject<TKey>>();

        // function to create a new Subject
        Func<TKey, Subject<TKey>> createSubject = key =>
        {
            Console.WriteLine("Added for " + key);
            var retval = new Subject<TKey>();
            subscribe(retval);
            retval.OnNext(key);
            return retval;
        };

        // function to update an existing Subject
        Func<TKey, Subject<TKey>, Subject<TKey>> updateSubject = (key, existing) =>
        {
            Console.WriteLine("Updated for " + key);
            existing.OnNext(key);
            return existing;
        };

        return source.Subscribe(next =>
        {
            var key = keySelector(next);
            observables.AddOrUpdate(key, createSubject, updateSubject);
        });
        // TODO dispose of all subscribers
    }

    // special case: key selector is just the item pass-through
    public static IDisposable SplitSubscribe<T>(
        this IObservable<T> source, 
        Action<IObservable<T>> subscribe)
    {
        return source.SplitSubscribe(item => item, subscribe);
    }
}

With this function, you can split a source observable, and then throttle each one. Usage is like this:

IObservable<string> dummyObservable = new string[] { "a", "b", "a", "b", "b", "c", "a" }.ToObservable();

dummyObservable.SplitSubscribe(next => 
    next.Throttle(TimeSpan.FromMilliseconds(250)).Subscribe(Console.WriteLine));

Output (original order not maintained)

Added for a
Added for b
Updated for a
Updated for b
Updated for b
Added for c
Updated for a
a
c
b
like image 20
McGarnagle Avatar answered Nov 03 '22 06:11

McGarnagle