I have an Observable
who reads data from a database. If data is null
I need to get it from the network. So I do flatMap
on first Observable
, check the result of the database operation and if it is null
I start that another Observable
to fetch data from the network.
Note: Observable
s have different Subscriber
s because I have different postprocessing depending on where data comes from (such a logic).
Observable.just(readDataFromDb()).flatMap(new Func1<SomeData, Observable<String>>() {
@Override public Observable<SomeData> call(SomeData s) {
if (s == null) {
getReadFromNetworkObservable().subscribe(new AnotherSubscriber()); // this one might not complete
return Observable.empty(); // I think I need to send this one only after readFromNetwork() completed
} else {
return Observable.just(s);
}
}
}).subscribe(new SomeSubscirber());
Given I send Observable.empty()
to exclude data processing for SomeSubscriber
, I have a foreboding my second Observable
can not always be finished because it might be simply garbage collected. I guess I saw it during my tests.
At this point, I think I just need to wait until Observable
who reads from the network completed and then send Observable.empty()
. So can I make the execution synchronous? But still I have a feeling I do it wrong.
If your observable does not complete, then you will be waiting forever! Handily the Reactive Extensions (Rx) team has added several methods which you can use before you use await to modify the result of what you are awaiting. All of these methods end with the word Async.
If upstream Observable emits one value and not completes — singleOrError () won’t emit any value and will just wait. If we take a look at marble diagram from documentation for example for singleOrError we’ll see exactly same:
To conclude, elementAtOrError () -like operators do not wait for upstream to complete and emit success as soon as upstream provided them with a requested value. If upstream Observable emits one value and not completes-firstOrError () will complete with success.
To conclude, singleOrError () -like operators wait for upstream to complete and only then can emit success. If upstream Observable emits one value and not completes — singleOrError () won’t emit any value and will just wait.
You can make any observable as blocking with .toBlocking
shortcut (See full info https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators)
Data d = getReadFromNetworkObservable()
.toBlocking()
.first() // or single() or singleOrDefault()
// manipulate with data here
Combining cache with network data is described here: http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/
And here: RxJava and Cached Data
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With