I have an Observable<<List<Foo>> getFoo()
that is created from a Retrofit Service and after calling the .getFoo()
method, I need to share it with Multiple Subscribers. Calling the .share()
method though, it causes the Network Call to be re-executed. Replay Operator does not work either. I know that a potential solution might be .cache()
, but I do not know why this behaviour is caused.
// Create an instance of our GitHub API interface. Retrofit retrofit = new Retrofit.Builder() .baseUrl(API_URL) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); // Create a call instance for looking up Retrofit contributors. Observable<List<Contributor>> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share(); Subscription subscription1 = testObservable .subscribe(new Subscriber<List<Contributor>>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List<Contributor> contributors) { System.out.println(contributors); } }); Subscription subscription2 = testObservable .subscribe(new Subscriber<List<Contributor>>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List<Contributor> contributors) { System.out.println(contributors + " -> 2"); } }); subscription1.unsubscribe(); subscription2.unsubscribe();
The code above can reproduce the aforementioned behaviour. You can debug it and see that the Lists received belong to a different MemoryAddress.
I have also looked at ConnectableObservables as a potential solution, but this requires me carrying the original observable around, and calling .connect()
each time I want to add a new Subscriber.
This kind of behaviour with the .share()
was working fine till Retrofit 1.9. It stopped working on Retrofit 2 - beta. I have not yet tested it with the Retrofit 2 Release Version, that was released some hours ago.
EDIT: 01/02/2017
For future readers, I have written an article here explaining more about the case!
Observable can have multiple subscribers. When an Observable emits an item, each subscriber onNext() method gets invoked. When an Observable finished emitting items, each subscriber onComplete() method gets invoked. If an Observable emits error, each subscriber onError() method gets invoked.
Single is an Observable which only emits one item or throws an error. Single emits only one value and applying some of the operator makes no sense.
In our component, we use forkJoin to combine the Observables into a single value Observable. The forkJoin operator will subscribe to each Observable passed into it. Once it receives a value from all the Observables, it will emit a new value with the combined values of each Observable.
The Subscribe operator is the glue that connects an observer to an Observable. In order for an observer to see the items being emitted by an Observable, or to receive error or completed notifications from the Observable, it must first subscribe to that Observable with this operator.
After checking back with RxJava developer Dávid Karnok I'd like to propose a full explanation of what was going on here.
share()
is defined as publish().refCount()
, i. e. the source Observable
is first transformed to a ConnectableObservable
by publish()
but instead of having to call connect()
"manually" that part is handled by refCount()
. In particular, refCount
will call connect()
on the ConnectableObservable
when it itself receives the first subscription; then, as long as there is at least one subscriber it will stay subscribed; and, finally, when the number of subscribers drops to 0 it will unsubscribe upwards. With cold Observables
, like the ones returned by Retrofit, this will stop any running computations.
If, after one of these cycles another subscriber comes along, refCount
will again call connect
and thus trigger a new subscription to the source Observable. In this case, it will trigger another network request.
Now, this usually did not become apparent with Retrofit 1 (and indeed any version before this commit), because these older versions of Retrofit by default moved all network requests to another thread. This usually meant that all your subscribe()
calls would happen while the first request/Observable
was still running and therefore the new Subscriber
s would simply be added to the refCount
and therefore would not trigger additional requests/Observables
.
Newer versions of Retrofit, however, do not by default move the work to another thread anymore - you have to do that explicitly by calling, for example, subscribeOn(Schedulers.io())
. If you don't, everything will just stay on the current thread, meaning that the second subscribe()
will only happen after the first Observable
has called onCompleted
and therefore after all Subscribers
have unsubscribed and everything is shut down. Now, as we saw in the first paragraph, when the second subscribe()
is called, share()
has no choice but to cause another Subscription
to the source Observable and trigger another network request.
So, to go back to the behavior you are used to from Retrofit 1, just add subscribeOn(Schedulers.io())
.
This should result in only network request being executed - most of the time. In principle though, you could still get multiple requests (and you always could have with Retrofit 1), but only if your network requests are extremely fast and/or the subscribe()
calls happen with considerable delay, so that, again, the first request is finished when the second subscribe()
happens.
Therefore, Dávid suggests to either use cache()
(but it has the drawbacks you mentioned) or replay().autoConnect()
. According to these release notes, autoConnect
works like only the first half of refCount
, or more precisely, it is
similar in behavior to refCount(), except that it doesn't disconnect when subscribers are lost.
This means the request would only be triggered when the first subscribe()
happens but then all later Subscriber
s would receive all emitted items, regardless of whether there were, at any time in between, 0 subscribers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With