I have a stream with frequent values and one with slower ones. I want to combine them, but only emit a value when the slower one emits. So combineLatest
doesn't work.
Like so:
a1
a2
b1
(a2,b1)
a3
a4
a5
b2
(a5,b2)
Currently I'm doing it like follows, is there a cleaner way?
withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] =
Observable({ o =>
var last : A
fast.subscribe({a => last = a})
slow.subscribe({b => o.onNext((last,b))})
})
edit: This operator is now in Rx and is called withLatestFrom.
What you are looking for is a combinator I have called "combinePrev", which doesn't exist in the API but turns out to be very necessary in many situations. sample
operator comes close, but it doesn't combine the two streams. I've also missed "combinePrev" in RxJS. It turns out, the implementation of "combinePrev" ("withLatest") is simple and just depends on map and switch:
withLatest[A,B](fast : Observable[A], slow : Observable[B]): Observable[(A,B)] = {
val hotSlow = slow.publish.refCount
fast.map({a => hotSlow.map({b => (a,b)})}).switch
}
Here is a jsfiddle example of the same operator implemented in RxJS.
While the operator is not in Rx, you can use an implicit class so you can use slow.withLatest(fast)
:
implicit class RXwithLatest[B](slow: Observable[B]) {
def withLatest[A](fast : Observable[A]) : Observable[(A,B)] = /* see above */
}
Note: slow
must be hot
. If slow
is a cold Observable, withLatest
doesn't work.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With