Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava: Observable and default thread

I have the following code:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        s.onNext("1");
                        s.onComplete();
                    }
                });
                thread.setName("background-thread-1");
                thread.start();
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("map: thread=" + threadName);
                return "map-" + s;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(String s) {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onComplete() {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onComplete: thread=" + threadName);
            }
        });

And here's the output:

map: thread=background-thread-1 
onNext: thread=background-thread-1, value=map-1 
onComplete: thread=background-thread-1

Important detail: I'm calling the subscribe method from another thread (main thread in Android).

So looks like Observable class is synchronous and by default and it performs everything (operators like map + notifying subscribers) on the same thread which emits events (s.onNext), right? I wonder... is it intended behaviour or I just misunderstood something? Actually I was expecting that at least onNext and onComplete callbacks will be called on the caller's thread, not on the one emitting events. Do I understand correctly that in this particular case actual caller's thread doesn't matter? At least when events are generated asynchronously.

Another concern - what if I receive some Observable as a parameter from some external source (i.e. I don't generate it on my own)... there is no way for me as its user to check if whether it is synchronous or asynchronous and I just have to explicitly specify where I want to receive callbacks via subscribeOn and observeOn methods, right?

Thanks!

like image 473
fraggjkee Avatar asked Apr 16 '17 11:04

fraggjkee


People also ask

Are observables multithreaded?

Observable collections were originally created for UI data binding, which is single-threaded, and that's why the observable collection pattern doesn't extend well to multi-threaded scenarios.

Is RxJava single-threaded?

Rx is single-threaded by default. It implies that an Observable and the chain of operators that we can apply to it. That will notify its observers on the same thread on which its subscribe() method is called. The observeOn and subscribeOn methods take as an argument a Scheduler, that, as the name suggests.

Is RxJava multithreaded?

By default, nothing in RxJava is multi-threaded. Multi-threading can easily be introduced, however, by using Schedulers. For example, if you did this: Observable.

What is the difference between single and Observable in RxJava?

A Single is something like an Observable, but instead of emitting a series of values — anywhere from none at all to an infinite number — it always either emits one value or an error notification.


1 Answers

RxJava is unopinionated about concurrency. It will produce values on the subscribing thread if you do not use any other mechanisem like observeOn/ subscribeOn. Please don't use low-level constructs like Thread in operators, you could break the contract.

Due to the use of Thread, the onNext will be called from the calling Thread ('background-thread-1'). The subscription happens on the calling (UI-Thread). Every operator down the chain will be called from 'background-thread-1'-calling-Thread. The subscription onNext will also be called from 'background-thread-1'.

If you want to produce values not on the calling thread use: subscribeOn. If you want to switch the thread back to main use observeOn somewhere in the chain. Most likely before subscribing to it.

Example:

Observable.just(1,2,3) // creation of observable happens on Computational-Threads
            .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Nearest to source wins
            .map(integer -> integer) // map happens on Computational-Threads
            .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
            .subscribe(integer -> {
                // called from mainThread
            });

Here is a good explanitation. http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html

like image 163
Hans Wurst Avatar answered Nov 15 '22 15:11

Hans Wurst