I am very new to RX. Here is a simple model of a problem I am trying to solve. It looks easy, but I am having a hard time finding appropriate operators (or manipulating streams in some other manner) to solve it.
So let’s say we have two streams. One is emitting values frequently; the other far less so. We want every time the second observable emits a value, take the latest value that has been emitted by the other observable by that point, and do something with it.
Non-working example:
let stream1 = Rx.Observable
.interval(100);
let stream2 = Rx.Observable
.interval(2000)
.combineLatest(stream1, (stream2Value, stream1Value) => stream1Value)
.do((stream1Value) => console.log('value:', stream1Value));
stream2.subscribe();
The problem with the above snippet is that it will wait until the first emitted value from stream2 and then will start emitting a stream of events at the frequency of stream1. What I want is to get a stream that will fire events at the rate of stream2, but will emit latest values emitted by stream1 by the time stream2 fired. It sounds as if I need stream1 to become a behavior subject so that I could access its last value when stream2 fires... But maybe there is a simpler solution?
You can use withLatestFrom
to do that:
let stream1 = Rx.Observable
.interval(100);
let stream2 = Rx.Observable
.interval(2000)
.withLatestFrom(stream1, (stream2Value, stream1Value) => stream1Value)
.do((stream1Value) => console.log("value:", stream1Value));
stream2.subscribe();
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.min.js"></script>
Also, depending upon your use case, you might find the auditTime
operator useful.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With