Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there an Rx operator for combining the latest from streams 1 and 2 only when stream 2 emits things?

Here's my attempt at drawing the marble diagram --

STREAM 1 = A----B----C---------D------> (magical operator) STREAM 2 = 1----------2-----3-----4---> STREAM 3 = 1A---------2C----3C----4D-->

I am basically looking for something that generates stream 3 from streams 1 and 2. Basically, whenever something is emitted from stream 2, it combines it with the latest from stream 1. combineLatest is similar to what I want but I only want things emitted from stream 3 when something is emitted from stream 2, not stream 1. Does an operator like this exist?

like image 760
Matthew Avatar asked Aug 19 '14 02:08

Matthew


People also ask

What are Rx operators?

Use of Rx operators. With RxOperator are basically a function that defines the observable, how and when it should emit the data stream. There are hundreds of operators available in RxJava. You can read an alphabetical list of all the operators available from here.

Which operator allows you to wait for a defined delay until an item is emitted?

The Delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable's items.

What is flowable RxJava?

Flowable be the backpressure-enabled base reactive class. Let's understand the use of Flowable using another example. Suppose you have a source that is emitting data items at a rate of 1 Million items/second. The next step is to make network request on each item.

What is RX Observable?

There are two key types to understand when working with Rx: Observable represents any object that can get data from a data source and whose state may be of interest in a way that other objects may register an interest. An observer is any object that wishes to be notified when the state of another object changes.


2 Answers

There is an operator that does what you need: One overload of sample takes another observable instead of duration as a parameter. The documentation is here: https://github.com/ReactiveX/RxJava/wiki/Filtering-Observables#sample-or-throttlelast

The usage (I'll give examples in scala):

import rx.lang.scala.Observable
import scala.concurrent.duration
import duration._

def o = Observable.interval(100.milli)
def sampler = Observable.interval(180.milli)

// Often, you just need the sampled observable
o.sample(sampler).take(10).subscribe(x ⇒ println(x +  ", "))
Thread.sleep(2000)
// or, as for your use case
o.combineLatest(sampler).sample(sampler).take(10).subscribe(x ⇒ println(x +  ", "))
Thread.sleep(2000)

The output:

0, 
2, 
4, 
6, 
7, 
9, 
11, 
13, 
15, 
16, 

(2,0), 
(4,1), 
(6,2), 
(7,3), 
(9,4), 
(11,5), 
(13,6), 
(15,7), 
(16,8), 
(18,9),

There is a slight catch in that duplicate entries from the sampled observable are swallowed (see discussion at https://github.com/ReactiveX/RxJava/issues/912). Other than that, I think it is exactly what you are looking for.

like image 189
Tomáš Dvořák Avatar answered Oct 05 '22 06:10

Tomáš Dvořák


withLatestFrom seems to fit exactly what I was looking for - http://rxmarbles.com/#withLatestFrom

like image 27
Matthew Avatar answered Oct 05 '22 06:10

Matthew