Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava subscribeOn and observeOn not override the original Scheduler set before?

I used RxJava in android with Retrofit 2 and I have invoked subscribeOn(Schedulers.io()) android observeOn(AndroidSchedulers.mainThread()) global before subscribe(). However, sometime I would like to call subscribeOn(Schedulers.immediate()) android observeOn(Schedulers.immediate()) to override the Schedulers set before to get synchronized process. But I found it doesn't work, android works would be still processed on io() thread, android result processed by mainThread(). Why?

like image 820
AtanL Avatar asked Feb 27 '16 01:02

AtanL


People also ask

What do subscribeon and observeon do in RxJava for threading?

We have used two of the most common operators subscribeOn and observeOn used in RxJava for threading. Let’s explore what these operators actually do using some examples. Let’s first print out some outputs, then we’ll see how they work. observeOn controls on which thread following operations will take place. In our main example:

What is scheduler in RxJava?

RxJava Schedulers. Threading in RxJava is done with help of Schedulers. Scheduler can be thought of as a thread pool managing 1 or more threads. Whenever a Scheduler needs to execute a task, it will take a thread from its pool and run the task in that thread.

What happens if you have multiple subscribeon() RxJava operators in your chain?

If you specify multiple subscribeOn () RxJava operators in your chain, only the first one will be used and the following ones will be ignored unless the subscribeOn () is used inside flatMap () as seen above. Note that Schedulers.computation () thread pool above did the work while Schedulers.newThread () was never used.

How do I use the observeon and subscribeon methods?

The observeOn and subscribeOn methods take as an argument a Scheduler, that, as the name suggests, is a tool that we can use for scheduling individual actions. We'll create our implementation of a Scheduler by using the createWorker method, which returns a Scheduler.Worker. A worker accepts actions and executes them sequentially on a single thread.


1 Answers

That's just the way RxJava works.

Take a look at this video tutorial, starting at the 12:50 mark. So given the example in the video:

Observable.just(1, 2, 3)
    .subscribeOn(Schedulers.newThread())
    .subscribeOn(Schedulers.io())
    .subscribe(System.out::println);

What happens is that subscribeOn() nests all calls. In this case subscribeOn(Schedulers.io()) is spawned first and subscribes everything above it on the io thread. But then subscribeOn(Schedulers.newThread()) is spawned next and it takes priority (since it was called last) to subscribe everything on it instead. There is no building a chain of threads. In this example, you are essentially spawning the io thread for no good reason.

To better handle the subscribeOn() and observeOn() methods, I suggest you take a look at this post from the same author of the video. What he is proposing is to use a Transformer to wrap the call to these methods:

Transformer is actually just Func1<Observable<T>, Observable<R>>. In other words: feed it an Observable of one type and it'll return an Observable of another. That's exactly the same as calling a series of operators inline.

This way, you can have a method like so:

<T> Transformer<T, T> applySchedulers() {  
    return observable -> observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
}

Or, if you want to reuse your transformers, you can have the following setup:

final Transformer schedulersTransformer =  
    observable -> observable.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

@SuppressWarnings("unchecked")
<T> Transformer<T, T> applySchedulers() {  
    return (Transformer<T, T>) schedulersTransformer;
}

Then the above example would look like:

Observable.just(1, 2, 3)
    .compose(applySchedulers())
    .subscribe(System.out::println);

Hope that helps.

like image 94
Ricardo Avatar answered Sep 28 '22 03:09

Ricardo