Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merging multiple Singles to form a Observable

I am writing an Android Application which needs to perform 2 queries in following order:

  1. Make a request(let's call it RequestA) to a library that returns a Single<List<String>> urls.
  2. Based on what I receive from RequestA, I have to make request(RequestB) to another library using each of these urls. Each of the RequestB now returns a Single.

Now I have combine all of the Single from all of the RequestB to form a observable.

Something like Observable.mergedelayerror(List<Single>). I can't do this because mergedelayerror expects iterable of ObservableSource.

I know I can achieve this by implementing callbacks and using some ugly logic But I am really looking for a solution by using only the operators provided by RX

like image 904
Dhruv Jagetiya Avatar asked Sep 21 '17 09:09

Dhruv Jagetiya


2 Answers

Here is a kotlin example explained. The java version will be almost identical. You may want to look at marble diagrams for these operators to make it visually easier to see what is going on.

getData() //<--- original api call that returns a list of urls                                                                                                                                                   
     .flatMapObservable { list ->                                                                                                                                                                             
         //<- Since we are on a single we have to convert to an observable to emit each item onto the stream                                                                                                 
         Observable.fromIterable(list) // <-- create that observable that emits each item                                                                                                                     
     }                                                                                                                                                                                                        
     .flatMapSingle { url ->                                                                                                                                                                                  
         // we a single url entering here and have add it's result onto the stream but since it's a single we use flatMapSingle instead                                                                       
         secondApi(url) // we call our second api here                                                                                                                                                        
     }                                                                                                                                                                                                        
     .toList() // <-- you may want to group them into a list                                                                                                                                                  
     .subscribeOn(Schedulers.io()) // <-- causes the work to be done on the io scheduler so that you don't hit network on mainthread exception nor slow down the ui                                           
     .observeOn(AndroidSchedulers.mainThread()) // <-- causes the outcome to be on the ui thread so you can safely update without getting an illegal state exception can't update the ui from a now ui thread 
     .subscribe { next, error ->                                                                                                                                                                              
         next?.let {                                                                                                                                                                                          
             //add to an adapter or something else                                                                                                                                                              
         }
         error?.let{
             // handle error
         }                                                                                                                                                                                      
     }
like image 107
Jim Baca Avatar answered Oct 18 '22 09:10

Jim Baca


You need Observable.fromIterable(). Try this:

Java 8:

public interface Api {

    Single<List<String>> urls();

    Single<String> url(String url);
}

private Api api;

public void request() {
    api.urls()
            .flatMapObservable(Observable::fromIterable)//Observable<String>
            .flatMapSingle(api::url)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}

Non Java 8:

api.urls()
        .flatMapObservable(new Function<List<String>, ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> apply(@NonNull List<String> source) throws Exception {
                return Observable.fromIterable(source);
            }
        })
        .flatMapSingle(new Function<String, SingleSource<? extends String>>() {
            @Override
            public SingleSource<? extends String> apply(@NonNull String url) throws Exception {
                return api.url(url);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();

Kotlin:

api.urls()
        .flatMapObservable { Observable.fromIterable(it) }
        .flatMapSingle { api.url(it) }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe()

I'd suggest to use Java 8 or switch to Kotlin. It will be much more nice to use.

like image 24
Aleksander Mielczarek Avatar answered Oct 18 '22 07:10

Aleksander Mielczarek