Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a Rx method to repeat the previous value periodically when no values are incoming?

A use case which I have encountered, and I suspect I can't be the only one, is for a method like:

IObservable<T> Observable.RepeatLastValueDuringSilence(this IObservable<T> inner, TimeSpan maxQuietPeriod);

which would return all the future items from the inner observable, but also, if the inner observable doesn't call OnNext for a certain period of time (maxQuietPeriod), it just repeats the last value (until of course inner calls OnCompleted or OnError).

A justification would be for a service to periodically ping out a periodic status update. For example:

var myStatus = Observable.FromEvent(
    h=>this.StatusUpdate+=h,
    h=>this.StatusUpdate-=h);

var messageBusStatusPinger = myStatus
    .RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1))
    .Subscribe(update => _messageBus.Send(update));

Does something like this exist? Or am I over-estimating it's usefulness?

Thanks, Alex

PS: I apologise for any incorrect terminology/syntax, as I'm only just exploring Rx for the first time.

like image 423
AlexC Avatar asked Jul 12 '12 12:07

AlexC


2 Answers

Similar solution to Matthew's, but here the timer starts after each element is received in the source, which I think is more correct (however the differences are unlikely to matter):

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod)
{    
    return inner.Select(x => 
        Observable.Interval(maxQuietPeriod)
                  .Select(_ => x)
                  .StartWith(x)
    ).Switch();
}

And the test:

var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1")
                       .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2"))
                       .Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3"));

source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);

You should see 1 printed 10 times (5 from source, 5 repeated during silence), then lots of 2 as you get the one from source and 4 more from silence between each, followed by infinite 3.

like image 122
yamen Avatar answered Sep 28 '22 06:09

yamen


This fairly simple query does the job:

var query =
    source
        .Select(s =>
            Observable
                .Interval(TimeSpan.FromSeconds(1.0))
                .StartWith(s)
                .Select(x => s))
        .Switch();

Never underestimate the power of .Switch().

like image 31
Enigmativity Avatar answered Sep 28 '22 06:09

Enigmativity