Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to wait for Observable's onComplete() in flatMap()

Tags:

rx-java

I have an Observable who reads data from a database. If data is null I need to get it from the network. So I do flatMap on first Observable, check the result of the database operation and if it is null I start that another Observable to fetch data from the network.

Note: Observables have different Subscribers because I have different postprocessing depending on where data comes from (such a logic).

 Observable.just(readDataFromDb()).flatMap(new Func1<SomeData, Observable<String>>() {
        @Override public Observable<SomeData> call(SomeData s) {
          if (s == null) {
            getReadFromNetworkObservable().subscribe(new AnotherSubscriber()); // this one might not complete
            return Observable.empty(); // I think I need to send this one only after readFromNetwork() completed
          } else {
            return Observable.just(s);
          }
        }
      }).subscribe(new SomeSubscirber());

Given I send Observable.empty() to exclude data processing for SomeSubscriber, I have a foreboding my second Observable can not always be finished because it might be simply garbage collected. I guess I saw it during my tests.

At this point, I think I just need to wait until Observable who reads from the network completed and then send Observable.empty(). So can I make the execution synchronous? But still I have a feeling I do it wrong.

like image 358
Eugene Avatar asked Jul 12 '15 10:07

Eugene


People also ask

What happens if an observable does not complete?

If your observable does not complete, then you will be waiting forever! Handily the Reactive Extensions (Rx) team has added several methods which you can use before you use await to modify the result of what you are awaiting. All of these methods end with the word Async.

What if upstream observable emits one value and not completes?

If upstream Observable emits one value and not completes — singleOrError () won’t emit any value and will just wait. If we take a look at marble diagram from documentation for example for singleOrError we’ll see exactly same:

Does elementatorerror wait for upstream to complete?

To conclude, elementAtOrError () -like operators do not wait for upstream to complete and emit success as soon as upstream provided them with a requested value. If upstream Observable emits one value and not completes-firstOrError () will complete with success.

What is the difference between singleorerror and upstream observable?

To conclude, singleOrError () -like operators wait for upstream to complete and only then can emit success. If upstream Observable emits one value and not completes — singleOrError () won’t emit any value and will just wait.


1 Answers

You can make any observable as blocking with .toBlocking shortcut (See full info https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators)

Data d = getReadFromNetworkObservable()
            .toBlocking()
            .first() //  or single() or singleOrDefault()

// manipulate with data here

Combining cache with network data is described here: http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/

And here: RxJava and Cached Data

like image 120
Sergii Pechenizkyi Avatar answered Oct 16 '22 04:10

Sergii Pechenizkyi