I used RxJava in android with Retrofit 2 and I have invoked subscribeOn(Schedulers.io()) android observeOn(AndroidSchedulers.mainThread()) global before subscribe(). However, sometime I would like to call subscribeOn(Schedulers.immediate()) android observeOn(Schedulers.immediate()) to override the Schedulers set before to get synchronized process. But I found it doesn't work, android works would be still processed on io() thread, android result processed by mainThread(). Why?
We have used two of the most common operators subscribeOn and observeOn used in RxJava for threading. Let’s explore what these operators actually do using some examples. Let’s first print out some outputs, then we’ll see how they work. observeOn controls on which thread following operations will take place. In our main example:
RxJava Schedulers. Threading in RxJava is done with help of Schedulers. Scheduler can be thought of as a thread pool managing 1 or more threads. Whenever a Scheduler needs to execute a task, it will take a thread from its pool and run the task in that thread.
If you specify multiple subscribeOn () RxJava operators in your chain, only the first one will be used and the following ones will be ignored unless the subscribeOn () is used inside flatMap () as seen above. Note that Schedulers.computation () thread pool above did the work while Schedulers.newThread () was never used.
The observeOn and subscribeOn methods take as an argument a Scheduler, that, as the name suggests, is a tool that we can use for scheduling individual actions. We'll create our implementation of a Scheduler by using the createWorker method, which returns a Scheduler.Worker. A worker accepts actions and executes them sequentially on a single thread.
That's just the way RxJava works.
Take a look at this video tutorial, starting at the 12:50 mark. So given the example in the video:
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.subscribe(System.out::println);
What happens is that subscribeOn()
nests all calls. In this case subscribeOn(Schedulers.io())
is spawned first and subscribes everything above it on the io thread. But then subscribeOn(Schedulers.newThread())
is spawned next and it takes priority (since it was called last) to subscribe everything on it instead. There is no building a chain of threads. In this example, you are essentially spawning the io thread for no good reason.
To better handle the subscribeOn()
and observeOn()
methods, I suggest you take a look at this post from the same author of the video. What he is proposing is to use a Transformer
to wrap the call to these methods:
Transformer
is actually justFunc1<Observable<T>, Observable<R>>
. In other words: feed it anObservable
of one type and it'll return anObservable
of another. That's exactly the same as calling a series of operators inline.
This way, you can have a method like so:
<T> Transformer<T, T> applySchedulers() {
return observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
Or, if you want to reuse your transformers, you can have the following setup:
final Transformer schedulersTransformer =
observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
@SuppressWarnings("unchecked")
<T> Transformer<T, T> applySchedulers() {
return (Transformer<T, T>) schedulersTransformer;
}
Then the above example would look like:
Observable.just(1, 2, 3)
.compose(applySchedulers())
.subscribe(System.out::println);
Hope that helps.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With