When to call the subscribeWith method rather than plain subscribe? And what is the use case?
compositeDisposable.add(get() .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribe(this::handleResponse, this::handleError));
VS
compositeDisposable.add(get() .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) // .subscribe(this::handleResponse, this::handleError); .subscribeWith(new DisposableObserver<News>() { @Override public void onNext(News value) { handleResponse(value); } @Override public void onError(Throwable e) { handleError(e); } @Override public void onComplete() { // dispose here ? why? when the whole thing will get disposed later //via compositeDisposable.dispose(); in onDestroy(); } }));
Thank you
According to the documentation, both return disposable SingleObserver instances:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) { subscribe(observer); return observer; } @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) { ObjectHelper.requireNonNull(onSuccess, "onSuccess is null"); ObjectHelper.requireNonNull(onError, "onError is null"); ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError); subscribe(s); return s; }
Where ConsumerSingleObserver class implements SingleObserver and Disposable.
To subscribe to a stream, simply call Observable<T>#subscribe(...) and pass in an Observer instance. There may also be some instances where we are no longer interested in the emissions of an Observable.
Subscribing in RxJava. To receive the data emitted from an observable you need to subscribe to it.
RxJava is a JVM library that uses observable sequences to perform asynchronous and event-based programming. Its primary building blocks are triple O's, which stand for Operator, Observer, and Observables. And we use them to complete asynchronous tasks in our project. It greatly simplifies multithreading in our project.
doOnSubscribe — Modifies the source so that it invokes the given action when it is subscribed from its subscribers. doOnTerminate — Calls the specified action just before this Observable signals onError or onCompleted.
Observable#subscribe explanation:
In your first code snippet:
.subscribe(this::handleResponse, this::handleError));
You are actually using one of the several overloaded Observable#subscribe
methods:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
There is another one that also takes in an Action
to perform onComplete:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {
And another option allows you to simply pass in an Observer
(NOTE: void method) (Edit 2 - this method is defined in ObservableSource
, which is the interface that Observable
extends.)
public final void subscribe(Observer<? super T> observer)
In the second code snippet in your question, you used the subscribeWith
method which simply returns the Observer
you passed in (for convenience/caching etc):
public final <E extends Observer<? super T>> E subscribeWith(E observer)
Observer#onComplete explanation:
Observer#onComplete gets called after the Observable has emitted all the items in the stream. From the java doc:
/** * Notifies the Observer that the {@link Observable} has finished sending push-based notifications. * <p> * The {@link Observable} will not call this method if it calls {@link #onError}. */ void onComplete();
So for example, if the get()
in your code snippets returned an Observable
that emitted multiple News
objects, each one will be handled in the Observer#onNext
. Here you can process each item.
After they have all been processed (and assuming no error occured), then the onComplete
will get called. Here you can perform any extra actions that you need to do (for eg. update UI) knowing that you've processed all the News
objects.
This is not to be confused with Disposable#dispose
which gets invoked when the observable stream ends (complete/error), or manually by you to terminate the observation (this is where the CompositeDisposable
comes in as it helps you dispose of all your Disposable
s that it contains at once).
If in your scenario the get()
will return an Observable
that only emits a single item, then instead of using an Observable
, consider using an io.reactivex.Single
where you only process the one item (in onSuccess
), and won't need to specify an Action
for onComplete :)
Edit: response to your comment:
However I still do not get use of subscribeWith, you said it passes the observer for caching etc , where does it pass to? on complete? and from what I understood subscribeWith is not actually consuming the observable( or Single) right?
To further clarify the subscribeWith
explanation, what I meant was that it will consume the Observer
object that you passed into the subscribeWith
(exactly like the subscribe
method) however it will additionally return that same Observer right back to you. At time of writing, the implementation of subscribeWith is:
public final <E extends Observer<? super T>> E subscribeWith(E observer) { subscribe(observer); return observer; }
Therefore, subscribeWith
can be used interchangeably with subscribe
.
Can you give a use case of subscribeWith with example? I guess that will answer the question completely
The subscribeWith
javadoc gives the following usage example:
Observable<Integer> source = Observable.range(1, 10); CompositeDisposable composite = new CompositeDisposable(); ResourceObserver<Integer> rs = new ResourceObserver<>() { // ... }; composite.add(source.subscribeWith(rs));
See here the usage of subscribeWith
will return that same ResourceObserver
object that was instantiated. This gives the convenience of performing the subscription & adding the ResourceObserver
to the CompositeDisposable
in one line (note that ResourceObservable
implements Disposable
.)
Edit 2 Replying to second comment.
source.subscribeWith(rs); source.subscribe(rs); both return SingleObserver instance,
ObservableSource#subscribe(Observer <? super T> observer)
does NOT return an Observer
. It is a void method (See NOTE under the Observable#subscribe explanation above.) Whereas the Observable#subscribeWith
DOES return the Observer
. If we were to rewrite the example usage code using ObservableSource#subscribe
instead, we'd have to do it in two lines like so:
source.subscribe(rs); //ObservableSource#subscribe is a void method so nothing will be returned composite.add(rs);
Whereas the Observable#subscribeWith
method made it convenient for us to do the above in just one line composite.add(source.subscribeWith(rs));
It can get confusing with all the overloaded subscribe methods that look somewhat similar, however there are differences (some of which are subtle). Looking at the code and documentation helps to provide the distinction between them.
The subscribeWith
method is useful for when you have a specific implementation of an Observer
that you may want to reuse. For example, in the sample code above, it provided a specific implementation of ResourceObserver
in the subscription, thereby inheriting it's functionality while still allowing you to handle onNext onError and onComplete.
Another example use: for to the sample code in your question, what if you wanted to perform the same subscription for the get()
response in multiple places?
Instead of copying the Consumer
implementations for onNext and onError across different classes, what you can do instead is define a new class for eg.
//sample code.. public class GetNewsObserver extends DisposableObserver<News> { //implement your onNext, onError, onComplete. .... }
Now, whenever you do that get()
request, you can simply subscribe by doing:
compositeDisposable.add(get() ... .subscribeWith(new GetNewsObserver()));
See the code is simple now, you maintain separation of responsibility for handling the response, and can now reuse that GetNewsObserver
wherever you want.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With