Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pick Observable latest value when any value is produced by another Observable

I have an Observable generated from a regular .NET event. The Observable is hot - not warm - in the sense that it starts producing values even before any subscription, and everytime someone subscribes it will receive the latest produced value. Let's name this eventStream.

Then I have another Observable, exposed by another class, which represents some state flow, so every new value gives the current state of something managed by that class. This Observable is hot as well. Let's name it stateStream.

Everytime the event sequence produces a new value, I want to pick (I'd say sample, but that might lead to confusion) the latest value provided by state sequence. This should produce a new sequence, combining the two values, then processing them, etc.

This is what I came up with, but it does not seem to work though:

var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/);
var stateStream = someDependency.SomeStateStream;

eventStream.Select(eventValue => 
  stateStream
    .Take(1)
    .Select(stateValue => new { Event = eventValue, State = stateValue }))
  .Switch()
  .Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
  .Subscribe(value => /* do something */);

The rationale behind that is taken from other similar scenarios I worked with, where a new value produced by some source causes a new subscription to run, thus a new Observable gets returned, and finally the IObservable<IObservable<...>> gets squashed into a one-dimensional IObservable again using Switch() or some similar operator.
But in this case, from a quick test, there seems to be no new subscription, and only the very first stateStream value gets produced. Instead I'd like to pick the first value (Take(1)) everytime the eventStream fires.

AFAIK, CombineLatest and Zip cannot fit the bill: CombineLatest fires everytime one of the two sequences provides a new value; Zip fires everytime both sequences have a new value available, and tipically this means when the slowest of the two has values. And/Then/When should not be right as well for same reason as Zip.

I've also checked SO thread combining one observable with latest from another observable, but I don't think that can apply here. Only in one of the comments I read

[...] and then Scan acts like a CombineLatest that filters for notifications from only one side

and somehow it sounded familiar, but I could not wrap my head around that.

like image 663
superjos Avatar asked Jun 29 '15 20:06

superjos


2 Answers

I think you want Observable.Sample()

stateSource.Sample(eventSource)
     .Zip(eventSource,...)
like image 63
Aron Avatar answered Sep 29 '22 08:09

Aron


It seems to me that your current solution is actually pretty close, but it sounds like you just need to swap the eventStream & stateStream observables around and then remove the .Take(1).

Try this:

stateStream
    .Select(stateValue => 
        eventStream
            .Select(eventValue => new { Event = eventValue, State = stateValue }))
    .Switch()
    .Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
    .Subscribe(value => { /* do something */ });

Depending on how stateStream is configured you may need to add a .StartWith(...) to it to get the initial values from eventStream, but I think this approach covers your requirements.

like image 36
Enigmativity Avatar answered Sep 29 '22 06:09

Enigmativity