I concat two observables to display data from cache firstly and after that start loading data from the network and show updated data.
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler),
getContentFromNetwork.subscibeOn(networkScheduler)
).observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
If there is no network connection the second observable fails immediately after OnSubscribe is called.
In case the second observable fails immediately, data from first observable is lost. The onNext method is never called in the subscriber.
I think, this might be due to the following code in the OperatorConcat.ConcatSubscriber
@Override
public void onNext(Observable<? extends T> t) {
queue.add(nl.next(t));
if (WIP_UPDATER.getAndIncrement(this) == 0) {
subscribeNext();
}
}
@Override
public void onError(Throwable e) {
child.onError(e);
unsubscribe();
}
Looks like after error is received it unsubscribes, and all pending onNext are lost.
What is the best way to solve my problem?
Update
Looks like I have found the solution, instead of setting observOn for concatenated observable I set observOn for each observable.
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
.subscribe(subscriber);
Default behavior of observeOn
is that onError
events can jump in front of the queue, here is the quote from the docs:
Note that
onError
notifications will cut ahead ofonNext
notifications on the emission thread ifScheduler
is truly asynchronous.
Here is small test to illustrate the thing:
Scheduler newThreadScheduler = Schedulers.newThread();
Observable<Integer> stream = Observable.create(integerEmitter -> {
integerEmitter.onNext(1);
integerEmitter.onNext(2);
integerEmitter.onNext(3);
integerEmitter.onNext(4);
integerEmitter.onNext(5);
integerEmitter.onError(new RuntimeException());
}, Emitter.BackpressureMode.NONE);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
stream.subscribeOn(Schedulers.computation())
.observeOn(newThreadScheduler).subscribe(subscriber);
subscriber.awaitTerminalEvent();
subscriber.assertValues(1, 2, 3, 4, 5);
subscriber.assertError(RuntimeException.class);
Normally consumer would expect the following sequence: 1 > 2 > 3 > 4 > 5 > Error
. But using just observeOn
may put error ahead and test will fail.
This behavior was implemented long time ago here https://github.com/ReactiveX/RxJava/issues/1680, check for the motivation why it was done like that. To avoid such behavior one can use overloaded observeOn
with delayError
parameter:
indicates if the
onError
notification may not cut ahead ofonNext
notification on the other side of the scheduling boundary. Iftrue
a sequence ending inonError
will be replayed in the same order as was received from upstream
This is what you normally expect, so changing observeOn(newThreadScheduler)
to observeOn(newThreadScheduler, true)
will fix the test.
Then to the question of @Neil: why solution proposed by @Rostyslav is working? It is working, because there is no thread switch for the final sequence.
In the proposed solution final sequence is crafted from two sequences on the same thread: 1st sequence is data from cache, 2nd sequence is just error from network. They are crafted together on the same thread and after there is no thread switch - subscriber observes on the AndroidSchedulers.mainThread()
. If you try to change final Scheduler
to some other, it will fail again.
Operators in RxJava are designed to short-circuit onError notifications in general. Because the observables being concatenated are asynchronous sources then you are experiencing the short-circuit. If you don't want the short-circuit then you could do a concat on materialized observables and then perform the processing you desire:
Observable.concat(
getContentFromCache.materialize().subscribeOn(dbScheduler),
getContentFromNetwork.materialize().subscribeOn(networkScheduler)
)
Another approach would be to use onErrorResumeNext
:
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler),
getContentFromNetwork.onErrorResumeNext(something)
.subscibeOn(networkScheduler)
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With