Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJS: how to combine two streams in one that fires events at a rate of one stream, but using latest values from other stream?

Tags:

rxjs

I am very new to RX. Here is a simple model of a problem I am trying to solve. It looks easy, but I am having a hard time finding appropriate operators (or manipulating streams in some other manner) to solve it.

So let’s say we have two streams. One is emitting values frequently; the other far less so. We want every time the second observable emits a value, take the latest value that has been emitted by the other observable by that point, and do something with it.

Non-working example:

let stream1 = Rx.Observable
    .interval(100);

let stream2 = Rx.Observable
    .interval(2000)
  .combineLatest(stream1, (stream2Value, stream1Value) => stream1Value)
  .do((stream1Value) => console.log('value:', stream1Value));

stream2.subscribe();

The problem with the above snippet is that it will wait until the first emitted value from stream2 and then will start emitting a stream of events at the frequency of stream1. What I want is to get a stream that will fire events at the rate of stream2, but will emit latest values emitted by stream1 by the time stream2 fired. It sounds as if I need stream1 to become a behavior subject so that I could access its last value when stream2 fires... But maybe there is a simpler solution?

like image 527
azangru Avatar asked Jan 21 '17 23:01

azangru


1 Answers

You can use withLatestFrom to do that:

let stream1 = Rx.Observable
  .interval(100);

let stream2 = Rx.Observable
  .interval(2000)
  .withLatestFrom(stream1, (stream2Value, stream1Value) => stream1Value)
  .do((stream1Value) => console.log("value:", stream1Value));

stream2.subscribe();
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.min.js"></script>

Also, depending upon your use case, you might find the auditTime operator useful.

like image 162
cartant Avatar answered Nov 12 '22 04:11

cartant