Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable: Getting latest value in intervals until source finishes

I'm looking for an observable selector with a signature akin to this:

static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval)

Which should:

  1. Emit the first item as soon as input emits its first item
  2. From then on, in fixed time intervals afterwards, emit the most recent item produced by input
  3. Complete (or fail) whenever input completes (or fails)

In terms of marbles, something like the following - assuming interval = 2 time units:

Time 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Input A B C D E F (complete)
Output A B D D E E complete (F not emitted anymore)

Is there any out-of-the-box way of doing so, or a reasonably easy selector to produce these results?

like image 896
Bogey Avatar asked Oct 20 '25 05:10

Bogey


2 Answers

This should probably do exactly what you want. I haven't tested it though.

/// <summary>Samples the source observable sequence at each interval,
/// allowing repeated emissions of the same element.</summary>
public static IObservable<T> SampleWithDuplicates<T>(this IObservable<T> source,
    TimeSpan interval, IScheduler scheduler = null)
{
    scheduler ??= DefaultScheduler.Instance;
    return source.Publish(published => Observable
        .Interval(interval, scheduler)
        .WithLatestFrom(published, (_, x) => x)
        .Merge(published.FirstAsync())
        .TakeUntil(published.LastOrDefaultAsync()));
}
like image 83
Theodor Zoulias Avatar answered Oct 21 '25 19:10

Theodor Zoulias


I've done the following now - I think it works, but I'll heave this open in case anyone can think of a more elegant way (or can think of an issue with my current implementation)

static IObservable<T> TakeLatest(this IObservable<T> input, TimeSpan interval, IScheduler scheduler) => input
  .FirstAsync()
  .Select(_ => Observable.Interval(interval, scheduler).StartWith(0))
  .Switch()
  .CombineLatest(input, (a,b) => (a,b))
  .DistinctUntilChanged(x => x.a)
  .Select(x => x.b)
  .TakeUntil(input.LastAsync());
like image 39
Bogey Avatar answered Oct 21 '25 19:10

Bogey



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!