Here's my attempt at drawing the marble diagram --
STREAM 1 = A----B----C---------D------>
(magical operator)
STREAM 2 = 1----------2-----3-----4--->
STREAM 3 = 1A---------2C----3C----4D-->
I am basically looking for something that generates stream 3 from streams 1 and 2. Basically, whenever something is emitted from stream 2, it combines it with the latest from stream 1. combineLatest
is similar to what I want but I only want things emitted from stream 3 when something is emitted from stream 2, not stream 1. Does an operator like this exist?
Use of Rx operators. With RxOperator are basically a function that defines the observable, how and when it should emit the data stream. There are hundreds of operators available in RxJava. You can read an alphabetical list of all the operators available from here.
The Delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable's items.
Flowable be the backpressure-enabled base reactive class. Let's understand the use of Flowable using another example. Suppose you have a source that is emitting data items at a rate of 1 Million items/second. The next step is to make network request on each item.
There are two key types to understand when working with Rx: Observable represents any object that can get data from a data source and whose state may be of interest in a way that other objects may register an interest. An observer is any object that wishes to be notified when the state of another object changes.
There is an operator that does what you need: One overload of sample
takes another observable instead of duration as a parameter. The documentation is here: https://github.com/ReactiveX/RxJava/wiki/Filtering-Observables#sample-or-throttlelast
The usage (I'll give examples in scala):
import rx.lang.scala.Observable
import scala.concurrent.duration
import duration._
def o = Observable.interval(100.milli)
def sampler = Observable.interval(180.milli)
// Often, you just need the sampled observable
o.sample(sampler).take(10).subscribe(x ⇒ println(x + ", "))
Thread.sleep(2000)
// or, as for your use case
o.combineLatest(sampler).sample(sampler).take(10).subscribe(x ⇒ println(x + ", "))
Thread.sleep(2000)
The output:
0,
2,
4,
6,
7,
9,
11,
13,
15,
16,
(2,0),
(4,1),
(6,2),
(7,3),
(9,4),
(11,5),
(13,6),
(15,7),
(16,8),
(18,9),
There is a slight catch in that duplicate entries from the sampled observable are swallowed (see discussion at https://github.com/ReactiveX/RxJava/issues/912). Other than that, I think it is exactly what you are looking for.
withLatestFrom
seems to fit exactly what I was looking for - http://rxmarbles.com/#withLatestFrom
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With