I started to play around with RxJava and ReactFX, and I became pretty fascinated with it. But as I'm experimenting I have dozens of questions and I'm constantly researching for answers.
One thing I'm observing (no pun intended) is of course lazy execution. With my exploratory code below, I noticed nothing gets executed until the merge.subscribe(pet -> System.out.println(pet))
is called. But what fascinated me is when I subscribed a second subscriber merge.subscribe(pet -> System.out.println("Feed " + pet))
, it fired the "iteration" again.
What I'm trying to understand is the behavior of the iteration. It does not seem to behave like a Java 8 stream
that can only be used once. Is it literally going through each String
one at a time and posting it as the value for that moment? And do any new subscribers following any previously fired subscribers receive those items as if they were new?
public class RxTest {
public static void main(String[] args) {
Observable<String> dogs = Observable.from(ImmutableList.of("Dasher", "Rex"))
.filter(dog -> dog.matches("D.*"));
Observable<String> cats = Observable.from(ImmutableList.of("Tabby", "Grumpy Cat", "Meowmers", "Peanut"));
Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));
Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);
merge.subscribe(pet -> System.out.println(pet));
merge.subscribe(pet -> System.out.println("Feed " + pet));
}
}
The Observables in RxJava Explained. In simple terms, An Observable is analogous to a speaker that broadcasts the value. It does various tasks and generates some values. An Operator is similar to a translator in that it converts/modifies data from one form to another.
Event-Based: The program executes the codes based on the events generated while the program is running. For Example, a Button click triggers an event and then the program's event handler receives this event and does some work. Observable sequences: It will be better understood by the mechanics of it.
If you want to merge observables of different type you need to use Observable. zip : Observable<String> o1 = Observable. just("a", "b", "c"); Observable<Integer> o2 = Observable.
Observable<T>
represents a monad, a chained operation, not the execution of the operation itself. It is descriptive language, rather than the imperative you're used to. To execute an operation, you .subscribe()
to it. Every time you subscribe a new execution stream is created from scratch. Do not confuse streams with threads, as subscription are executed synchronously unless you specify a thread change with .subscribeOn()
or .observeOn()
. You chain new elements to any existing operation/monad/Observable to add new behaviour, like changing threads, filtering, accumulation, transformation, etc. In case your observable is an expensive operation you don't want to repeat on every subscription, you can prevent recreation by using .cache()
.
To make any asynchronous/synchronous Observable<T>
operation into a synchronous inlined one, use .toBlocking()
to change its type to BlockingObservable<T>
. Instead of .subscribe()
it contains new methods to execute operations on each result with .forEach()
, or coerce with .first()
Observables are a good tool because they're mostly* deterministic (same inputs always yield same outputs unless you're doing something wrong), reusable (you can send them around as part of a command/policy pattern) and for the most part ignore concurrence because they should not rely on shared state (a.k.a. doing something wrong). BlockingObservables are good if you're trying to bring an observable-based library into imperative language, or just executing an operation on an Observable that you have 100% confidence it's well managed.
Architecting your application around these principles is a change of paradigm that I can't really cover on this answer.
*There are breaches like
Subject
andObservable.create()
that are needed to integrate with imperative frameworks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With