Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava2 .subscribeOn .observeOn confusion. Running on Main thread

I had a method calling a webservice which I thought was running on IO thread until the service stopped and the UI froze.

So I started some simple testing to check threading

implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.1.8'


public void test() {

    disposableRx.add(
            Observable.just(1, 2)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println("Emitting item on: " + Thread.currentThread().getName());
                        }
                    })
                    .map(new Function<Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer integer) throws Exception {
                            System.out.println("Processing item on: " + Thread.currentThread().getName());
                            return integer * 2;
                        }
                    })

                    .subscribeWith(new DisposableObserver<Integer>() {
                        @Override
                        public void onNext(@NonNull Integer integer) {
                            System.out.println("Consuming item on: " + Thread.currentThread().getName());
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {
                        }

                        @Override
                        public void onComplete() {
                        }
                    })
    );
}

Is resulting in the following output indicating everything is running on the main thread, despite having subscribe and observe ?

Emitting item on: main
Processing item on: main
Consuming item on: main
Emitting item on: main
Processing item on: main
Consuming item on: main

BUT If I move the observeOn to immediately before the .subscribeWith as follows...

public void test() {

    disposableRx.add(
            Observable.just(1, 2)
                    .subscribeOn(Schedulers.io())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            System.out.println("Emitting item on: " + Thread.currentThread().getName());
                        }
                    })
                    .map(new Function<Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer integer) throws Exception {
                            System.out.println("Processing item on: " + Thread.currentThread().getName());
                            return integer * 2;
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeWith(new DisposableObserver<Integer>() {
                        @Override
                        public void onNext(@NonNull Integer integer) {
                            System.out.println("Consuming item on: " + Thread.currentThread().getName());
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {
                        }

                        @Override
                        public void onComplete() {
                        }
                    })
    );
}

The output is what I am looking for which I must say is confusing me even after reading many blogs about RxJava.

Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Emitting item on: RxCachedThreadScheduler-1
Processing item on: RxCachedThreadScheduler-1
Consuming item on: main
Consuming item on: main

I've stripped my original method back until it's pretty much a copy of an example method on a Blog post

Multithreading like a boss

which implies that this should run the loadPerson() on the IO thread, emitting on the main thread. It doesn't.

disposableRx.add(
        repo.loadPersonProfile(id).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableMaybeObserver<String>() {

                    @Override
                    public void onSuccess(@NonNull String response) {
                        loadPersonDetailsResponse.setValue(ViewModelResponse.success(response));
                        isLoading.setValue(false);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        loadPersonDetailsResponse.setValue(ViewModelResponse.error(e));
                        isLoading.setValue(false);
                    }

                    @Override
                    public void onComplete() {

                    }
                })
);

Dumping out the thread from within my method shows that it's running on the Main thread?

What's causing this?

like image 270
MartinS Avatar asked Jan 19 '18 10:01

MartinS


2 Answers

The order in which you put observeOn() and sunbscribeOn() , and other operators is very important.

  • subscribeOn() operator tells the source Observable which thread to emit and transform items on.

  • Be careful where you put the observeOn() operator because it changes the thread performing the work! In most cases you probably want to delay switching to the observing thread until the very end of your Rx chain.

    Observable.just("long", "longer", "longest")
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .map(String::length)
        .filter(length -> length == 6)
        .subscribe(length -> System.out.println("item length " + length));
    

here the map, filter and consuming is performed in the main thread

  • observeOn() before map() There is no reason to have observeOn() operator applied above the map() operator. In fact, this code will result in NetworkOnMainThreadException! We do not want to be reading from HTTP response on the main thread — it should be done before we switch back to the main thread.

  • you can also use multiple observeOn() to switch threads like this example.

     Observable.just("long", "longer", "longest")
    .doOnNext(s -> System.out.println("first doOnNext: processing item on thread " + Thread.currentThread().getName()))
    .observeOn(Schedulers.computation())
    .map(String::toString)
    .doOnNext(s -> System.out.println("second doOnNext: processing item on thread " + Thread.currentThread().getName()))
    .observeOn(Schedulers.io())
    .map(String::toString)
    .subscribeOn(Schedulers.newThread())
    .map(String::length)
    .subscribe(length -> System.out.println("received item length " + length + " on thread " + Thread.currentThread().getName()));
    

OUTPUT :

first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
first doOnNext: processing item on thread RxNewThreadScheduler-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
second doOnNext: processing item on thread RxComputationThreadPool-1
received item length 4 on thread RxCachedThreadScheduler-1
received item length 6 on thread RxCachedThreadScheduler-1
received item length 7 on thread RxCachedThreadScheduler-1

Note according to this answer subscribeOn() does't apply to the downstream operators, Therefore it does not guarantee that your operation is going to be on a different Thread.

subscribeOn effects go upstream and closer to the source of events.

As for your problem I have made a test and here are the results

   private void testRxJava2Async() {
 io.reactivex.Observable.fromCallable(new Callable<String>() {
        @Override
        public String call() throws Exception {

            Log.d(TAG,"callable (expensive assync method) was called on --> "+Thread.currentThread().getName());

            return null;
        }
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new Observer<String>() {


                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {

                    Log.d(TAG,"onNext() was called on --> "+Thread.currentThread().getName());

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}

Test Result :

callable (expensive assync method) was called on --> RxCachedThreadScheduler-1
onNext() was called on--> main
like image 199
Bishoy Kamel Avatar answered Nov 08 '22 04:11

Bishoy Kamel


Believe me i understand your confusion, let me explain step by step. Remember this for every subscribe there should be observe. We all know all subscribe should be executed on a worker thread(IO) and observer on Main thread(UI). In the first example the subscribe fired, then you said, for this subscription all updates will be delivered on the main thread, regardless of any more subscriptions during any transformations. However in the second example , this is what you said , observe only when the map transformations on the subscriptions is transformed then run on Main thread. Think of it, as Pre and Post increment in programming.

Hope this makes sense. RX framework is great. but actually requires live practice to get it right. Has you have seen first hand.

To ensure your changes are pushed and performed on the main thread what you need to do is to add a intermediary step to observe the changes.

Integer[] list = {6,3,2,1};
 Observable.just(list).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(value = > value  * 2)
.observeOn(Schedulers.mainThread())
.subscribeWith(...)

The idea is this subscribeOn only accepts to start the processing , such as a NetworkRequest but does not guarantee that values will be pushed to that same thread. observeOn says hey i can receive those values you initially subscribed for. So to make sure the values are transformed on the main thread, you can observe changes on another plain thread , perform a operation(Map| Flatmap etc), then observe those changes this will guarantee only those values are placed on the main thread. Basically what your saying is this, hey perform all the heavy processing in those threads, but hey whenever the computations are done, pass me those values to me in the main thread to continue my work, only then can i accept, therefore no work will ever be done on the main thread.

like image 43
Remario Avatar answered Nov 08 '22 04:11

Remario