Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How does RxJava Observable "Iteration" work?

Tags:

java

rx-java

I started to play around with RxJava and ReactFX, and I became pretty fascinated with it. But as I'm experimenting I have dozens of questions and I'm constantly researching for answers.

One thing I'm observing (no pun intended) is of course lazy execution. With my exploratory code below, I noticed nothing gets executed until the merge.subscribe(pet -> System.out.println(pet)) is called. But what fascinated me is when I subscribed a second subscriber merge.subscribe(pet -> System.out.println("Feed " + pet)), it fired the "iteration" again.

What I'm trying to understand is the behavior of the iteration. It does not seem to behave like a Java 8 stream that can only be used once. Is it literally going through each String one at a time and posting it as the value for that moment? And do any new subscribers following any previously fired subscribers receive those items as if they were new?

public class RxTest {

    public static void main(String[] args) {

        Observable<String> dogs = Observable.from(ImmutableList.of("Dasher", "Rex"))
                .filter(dog -> dog.matches("D.*"));

        Observable<String> cats = Observable.from(ImmutableList.of("Tabby", "Grumpy Cat", "Meowmers", "Peanut"));

        Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));

        Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);

        merge.subscribe(pet -> System.out.println(pet));


        merge.subscribe(pet -> System.out.println("Feed " + pet));

    }
}
like image 308
tmn Avatar asked Apr 18 '15 00:04

tmn


People also ask

What is Observable RxJava?

The Observables in RxJava Explained. In simple terms, An Observable is analogous to a speaker that broadcasts the value. It does various tasks and generates some values. An Operator is similar to a translator in that it converts/modifies data from one form to another.

How does RxJava work internally?

Event-Based: The program executes the codes based on the events generated while the program is running. For Example, a Button click triggers an event and then the program's event handler receives this event and does some work. Observable sequences: It will be better understood by the mechanics of it.

How do I combine two Observables in RxJava?

If you want to merge observables of different type you need to use Observable. zip : Observable<String> o1 = Observable. just("a", "b", "c"); Observable<Integer> o2 = Observable.


1 Answers

Observable<T> represents a monad, a chained operation, not the execution of the operation itself. It is descriptive language, rather than the imperative you're used to. To execute an operation, you .subscribe() to it. Every time you subscribe a new execution stream is created from scratch. Do not confuse streams with threads, as subscription are executed synchronously unless you specify a thread change with .subscribeOn() or .observeOn(). You chain new elements to any existing operation/monad/Observable to add new behaviour, like changing threads, filtering, accumulation, transformation, etc. In case your observable is an expensive operation you don't want to repeat on every subscription, you can prevent recreation by using .cache().

To make any asynchronous/synchronous Observable<T> operation into a synchronous inlined one, use .toBlocking() to change its type to BlockingObservable<T>. Instead of .subscribe() it contains new methods to execute operations on each result with .forEach(), or coerce with .first()

Observables are a good tool because they're mostly* deterministic (same inputs always yield same outputs unless you're doing something wrong), reusable (you can send them around as part of a command/policy pattern) and for the most part ignore concurrence because they should not rely on shared state (a.k.a. doing something wrong). BlockingObservables are good if you're trying to bring an observable-based library into imperative language, or just executing an operation on an Observable that you have 100% confidence it's well managed.

Architecting your application around these principles is a change of paradigm that I can't really cover on this answer.

*There are breaches like Subject and Observable.create() that are needed to integrate with imperative frameworks.

like image 61
MLProgrammer-CiM Avatar answered Sep 29 '22 19:09

MLProgrammer-CiM