Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive: Latest value to be received by IObservable

I know that the following is a blocking call and will return the first value in the observable sequence:

var result = myObservable.First();

Depending on what type of subject I use this has different implications:

  • Subject - First() will block until the OnNext() is next called which means eventually this will be the latest value
  • BehaviorSubject - First() will block until at least one value has been pushed through OnNext() and because BehaviorSubject tracks the last value this will be the latest value
  • ReplaySubject - First() will block until at least one value has been pushed through OnNext(), but in the case that many items have been pushed through OnNext it will be the first on in its buffer which is NOT the last one

Now I am trying to find a consistent way of getting the last value regardless of which underlying Observable is used.

Any ideas?

like image 273
vdh_ant Avatar asked Mar 08 '11 16:03

vdh_ant


2 Answers

Based on your other Rx question, I think you want this:

var rootSubject = new ReplaySubject<Types>();
var firstSubject = rootSubject.Where(x => x == Types.First);
var secondSubject = rootSubject.Where(x => x == Types.Second);
var thirdSubject = rootSubject.Where(x => x == Types.Third);
var forthSubject = rootSubject.Where(x => x == Types.Fourth);

var mergedSubject = Observable
              .Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
        .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))    
        .Replay();

mergedSubject.Connect();

rootSubject.OnNext(Types.First);
rootSubject.OnNext(Types.Second);

var result = mergedSubject.First();

rootSubject.OnNext(Types.Third);
rootSubject.OnNext(Types.Fourth);

Console.WriteLine(String.Format("result - {0}", result));

Now it doesn't matter what kind of Subject is used, they all return "result - First".

If you want the latest value before calling mergedSubject.First(), you'd use Replay(1):

var mergedSubject = Observable
                .Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
                .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))    
                .Replay(1);

In which case, all Subject types will return "result - Second".

like image 182
Richard Anthony Freeman-Hein Avatar answered Nov 15 '22 07:11

Richard Anthony Freeman-Hein


Sounds like you're looking for Replay (which, in the latest version, is functionally equivalent to using Multicast with a ReplaySubject):

IObservable<int> source = SomeHotObservable();

IConnectableObservable<int> sourceWithOneBufferedValue = source.Replay(1);

IDisposable subscription = sourceWithOneBufferedValue.Connect();

source.OnNext(5);

sourceWithOneBufferedValue.Subscribe(x => Console.WriteLine(x));
like image 21
Richard Szalay Avatar answered Nov 15 '22 07:11

Richard Szalay