I have two streams signaling when some conditions change. I need an Observable which will fire true
when all the conditions turn true
. false
when any of them turns false
. If some of the conditions is false
and another changes to false
I don't need to raise events.
Here is how I'm doing it:
// Introducing current states
private bool cond1 = false;
private bool cond2 = false;
void MyHandlingMethod(IObservable<bool> c1, IObservable<bool> c2)
{
c1.Subscribe(b => _state1 = b);
c2.Subscribe(b => _state2 = b);
var c3 = c1.Merge(c2).Select(_ => _cond1 && _cond2);
c3.Subscribe(b => /* some action goes here /*);
// ...
}
I want to know if it's the right way solve my problem and if there are any pitfals. Eg c3 subscription fires before c1 and c2 due to asynchronous nature of rx.
There is no need to keep state:
c3 = c1.CombineLatest(c2, (a, b) => a && b).DistinctUntilChanged()
Honestly, I'd probably go with the CombineLatest
approach, but in the interests of touching the somewhat-untouched parts of the Rx framework...
Although it's not a perfect fit, you can use Observable.When
/ Observable.And
/ Observable.Then
pattern:
var firstStream = new Subject<bool>();
var secondStream = new Subject<bool>();
var thirdStream = new Subject<bool>();
var fourthStream = new Subject<bool>();
var query = Observable.When(firstStream
.And(secondStream)
.And(thirdStream)
.And(fourthStream)
.Then((v1,v2,v3,v4) => v1 & v2 & v3 & v4));
using(query.Subscribe(Console.WriteLine))
{
firstStream.OnNext(true);
secondStream.OnNext(true);
thirdStream.OnNext(true);
// output stream will fire after this statement
fourthStream.OnNext(true);
Console.ReadLine();
}
One benefit of this approach is you are only creating output events when all streams have data that combine according to the Then
clause - it also reads rather nicely. That said, it has one critical failing for your use case: you must have data on each incoming stream to trigger the output stream:
using(query.Subscribe(Console.WriteLine))
{
firstStream.OnNext(true);
secondStream.OnNext(true);
thirdStream.OnNext(true);
// output stream will fire after this statement
fourthStream.OnNext(true);
// this WON'T raise false on the output!
firstStream.OnNext(false);
secondStream.OnNext(false);
thirdStream.OnNext(false);
// output stream will fire false after this statement
fourthStream.OnNext(false);
Console.ReadLine();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With