Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

combineLatest emit only when one of the streams changes

I have a stream with frequent values and one with slower ones. I want to combine them, but only emit a value when the slower one emits. So combineLatest doesn't work. Like so:

a1
a2
b1
(a2,b1)
a3
a4
a5
b2
(a5,b2)

Currently I'm doing it like follows, is there a cleaner way?

withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] =
  Observable({ o =>
    var last : A
    fast.subscribe({a => last = a})
    slow.subscribe({b => o.onNext((last,b))})
  })

edit: This operator is now in Rx and is called withLatestFrom.

like image 531
dtech Avatar asked Nov 29 '14 14:11

dtech


1 Answers

What you are looking for is a combinator I have called "combinePrev", which doesn't exist in the API but turns out to be very necessary in many situations. sample operator comes close, but it doesn't combine the two streams. I've also missed "combinePrev" in RxJS. It turns out, the implementation of "combinePrev" ("withLatest") is simple and just depends on map and switch:

withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = {
  val hotSlow = slow.publish.refCount
  fast.map({a => hotSlow.map({b => (a,b)})}).switch
}

Here is a jsfiddle example of the same operator implemented in RxJS.

While the operator is not in Rx, you can use an implicit class so you can use slow.withLatest(fast):

implicit class RXwithLatest[B](slow: Observable[B]) {
  def withLatest[A](fast : Observable[A]) : Observable[(A,B)] = /* see above */
}

Note: slow must be hot. If slow is a cold Observable, withLatest doesn't work.

like image 100
André Staltz Avatar answered Sep 28 '22 16:09

André Staltz