I have the following code:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
And here's the output:
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
Important detail: I'm calling the subscribe
method from another thread (main
thread in Android).
So looks like Observable
class is synchronous and by default and it performs everything (operators like map
+ notifying subscribers) on the same thread which emits events (s.onNext
), right? I wonder... is it intended behaviour or I just misunderstood something? Actually I was expecting that at least onNext
and onComplete
callbacks will be called on the caller's thread, not on the one emitting events. Do I understand correctly that in this particular case actual caller's thread doesn't matter? At least when events are generated asynchronously.
Another concern - what if I receive some Observable as a parameter from some external source (i.e. I don't generate it on my own)... there is no way for me as its user to check if whether it is synchronous or asynchronous and I just have to explicitly specify where I want to receive callbacks via subscribeOn
and observeOn
methods, right?
Thanks!
Observable collections were originally created for UI data binding, which is single-threaded, and that's why the observable collection pattern doesn't extend well to multi-threaded scenarios.
Rx is single-threaded by default. It implies that an Observable and the chain of operators that we can apply to it. That will notify its observers on the same thread on which its subscribe() method is called. The observeOn and subscribeOn methods take as an argument a Scheduler, that, as the name suggests.
By default, nothing in RxJava is multi-threaded. Multi-threading can easily be introduced, however, by using Schedulers. For example, if you did this: Observable.
A Single is something like an Observable, but instead of emitting a series of values — anywhere from none at all to an infinite number — it always either emits one value or an error notification.
RxJava is unopinionated about concurrency. It will produce values on the subscribing thread if you do not use any other mechanisem like observeOn/ subscribeOn. Please don't use low-level constructs like Thread in operators, you could break the contract.
Due to the use of Thread, the onNext will be called from the calling Thread ('background-thread-1'). The subscription happens on the calling (UI-Thread). Every operator down the chain will be called from 'background-thread-1'-calling-Thread. The subscription onNext will also be called from 'background-thread-1'.
If you want to produce values not on the calling thread use: subscribeOn. If you want to switch the thread back to main use observeOn somewhere in the chain. Most likely before subscribing to it.
Example:
Observable.just(1,2,3) // creation of observable happens on Computational-Threads
.subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Nearest to source wins
.map(integer -> integer) // map happens on Computational-Threads
.observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
.subscribe(integer -> {
// called from mainThread
});
Here is a good explanitation. http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With