I have a situation where I have to create N Observable
objects based on a request. I understand that I can use zip
to combine a KNOWN number of Observables. But, I'm stuck trying to understand how I can combine an unknown number of Observable.
The part where I'm not clear is what function to pass to zip
. Depending on the number of Observables, I have to create a lambda that takes N arguments.
All the Observable return different types of objects.
Update:
I ended up with the normal way to solve parallel request list. Just use flatMap, merge, zip, any combining rx operator.
The only thing we need to do specially is to use .subscribeOn(Schedulers.io()) for each request. All other things, sending parallel requests or simultaneous would be arranged by rxjava perfectly.
If you want to see the effect, try following:
private void runMyTest() {
List<Single<String>> singleObservableList = new ArrayList<>();
singleObservableList.add(getSingleObservable(500, "AAA"));
singleObservableList.add(getSingleObservable(300, "BBB"));
singleObservableList.add(getSingleObservable(100, "CCC"));
Single.merge(singleObservableList)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(System.out::println); }
private Single<String> getSingleObservable(long waitMilliSeconds, String name) {
return Single
.create((SingleOnSubscribe<String>) e -> {
try {
Thread.sleep(waitMilliSeconds);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
System.out.println("name = " +name+ ", waitMilliSeconds = " +waitMilliSeconds+ ", thread name = "
+Thread.currentThread().getName()+ ", id =" +Thread.currentThread().getId());
if(!e.isDisposed()) e.onSuccess(name);
})
.subscribeOn(Schedulers.io()); }
output:
System.out: name = CCC, waitMilliSeconds = 100, thread name = RxCachedThreadScheduler-4, id =463
System.out: CCC
System.out: name = BBB, waitMilliSeconds = 300, thread name = RxCachedThreadScheduler-3, id =462
System.out: BBB
System.out: name = AAA, waitMilliSeconds = 500, thread name = RxCachedThreadScheduler-2, id =461
System.out: AAA
// ******previous answer but not accurate*******//
Use this one; solved my problem:
zip(java.lang.Iterable<? extends Observable<?>> ws,FuncN<? extends R> zipFunction)
method.
An sample:
public Observable<CombinedData> getCombinedObservables() {
List<Observable> observableList = new ArrayList<>();
observableList.add(observable1);
observableList.add(observable2);
observableList.add(observable3);
observableList.add(observable4);
return Observable.zip(observableList, new Function<Object[], CombinedData>() {
@Override
public CombinedData apply(Object[] objects) throws Exception {
return new CombinedData(...);
}
});
}
You can make your Observables return objects of common type, merge it and process as List:
class Result1 implements Result
class Result2 implements Result
class REsult3 implements Result
for(Observable o : yourObservableList)
resultObservable.mergeWith(o) //use concat() if you need serial execution
resultObservable
.toList()
.doOnNext(results -> {
//process your results (List<Result>)
}
.subscribe(...)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With