My requirements:
null
or error objects while successful ones return the ResponseBody
objects.What I have:
Observable<ResponseBody> api1Call = api1.fetchData();
Observable<ResponseBody> api2Call = api2.fetchData();
Observable<ResponseBody> api3Call = api3.fetchData();
Observable.combineLatest(api1Call, api2Call, api3Call, new Func2<ResponseBody, ResponseBody, ResponseBody, Object>() {
@Override
public Object call(ResponseBody responseBody1, ResponseBody responseBody2, ResponseBody responseBody3) {
Logger.i("what does this do? - '%s', '%s', '%s'", responseBody1, responseBody2, responseBody3);
return null;
}
}).onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
Logger.e(throwable, "some error with one of the apis?");
return Observable.empty();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
Logger.i("onCompleted");
}
@Override
public void onError(Throwable e) {
Logger.e(e, "onError");
}
@Override
public void onNext(Object o) {
Logger.i("onNext " + o);
}
});
The output I got:
some error with one of the apis?
// stacktrace of the error
onCompleted
I'm new to RxJava and very confused. I found some answers on StackOverflow saying zip
does similar thing but it's even further from my requirements. I'm guessing one of the "combine" operators + proper exception handing will do what I need. It's just been really hard to figure it out so far
Versions I'm using:
compile 'io.reactivex:rxjava:1.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
If you want to merge observables of different type you need to use Observable. zip : Observable<String> o1 = Observable. just("a", "b", "c"); Observable<Integer> o2 = Observable.
By default, nothing in RxJava is multi-threaded. Multi-threading can easily be introduced, however, by using Schedulers. For example, if you did this: Observable.
You can not achieve parallel via combineLast
or zip
, rxjava
will execute and emit your items in sequence in my testing.
If one of your task fail, your Func2#call
will not get called and onError
will submitted instead. You even can not get the results of other successful tasks in this way.
The solution is flatMap
, it's the traditional way to achieve concurrent in rxjava
. It also meet your other requirements.
Here is a small but completed example.
I use a simple website service to test.
I use a Semaphore
to wait for all task done, you can completely ignore it. And I add logging to the http request for better understanding, you can complete ignore it also.
public interface WebsiteService {
@GET
Observable<ResponseBody> website(@Url String url);
}
Then I use the following to test the result with rxjava
.
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
Retrofit retrofit = new Retrofit.Builder().baseUrl("https://www.google.com")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.client(new OkHttpClient.Builder().addInterceptor(loggingInterceptor).build())
.build();
WebsiteService websiteService = retrofit.create(WebsiteService.class);
final Semaphore s = new Semaphore(1);
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
Observable<ResponseBody> first = websiteService.website("http://github.com");
Observable<ResponseBody> second = websiteService.website("http://stackoverflow.com");
Observable<ResponseBody> third = websiteService.website("http://notexisting.com");
final int numberOfCalls = 3; // testing for three calls
Observable.just(first, second, third)
.flatMap(new Function<Observable<ResponseBody>, ObservableSource<ResponseBody>>() {
@Override
public ObservableSource<ResponseBody> apply(@NonNull Observable<ResponseBody> responseBodyObservable) throws Exception {
return responseBodyObservable.subscribeOn(Schedulers.computation());
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<ResponseBody>() {
private int currentDoneCalls = 0;
private void checkShouldReleaseSemaphore() {
if (currentDoneCalls >= numberOfCalls) {
s.release();
}
}
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull ResponseBody responseBody) {
System.out.println("Retrofit call success " + responseBody.contentType());
synchronized (this) {
currentDoneCalls++;
}
checkShouldReleaseSemaphore();
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("Retrofit call failed " + e.getMessage());
synchronized (this) {
currentDoneCalls++;
}
checkShouldReleaseSemaphore();
}
@Override
public void onComplete() {
System.out.println("onComplete, All request success");
checkShouldReleaseSemaphore();
}
});
try {
s.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("All request done");
s.release();
}
I use rxjava2
and retrofit adapter-rxjava2
for testing.
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.1'
The introduction page of RxJava2 from github has pointed out the practical way to implement paralellism.
Practically, paralellism in RxJava means running independent flows and merging their results back into a single flow. The operator
flatMap
does this...
Although this example is based on RxJava2
, the operation flatMap
is
already existing in RxJava
.
I think in your use case Zip operator it´s more suitable
Here you can see running in the main thread, but also it´s possible make it run every one of them in another thread if you use observerOn
/**
* Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel.
* By default Rx is not async, only if you explicitly use subscribeOn.
*/
@Test
public void testAsyncZip() {
scheduler = Schedulers.newThread();
scheduler1 = Schedulers.newThread();
scheduler2 = Schedulers.newThread();
long start = System.currentTimeMillis();
Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Async in:", start, result));
}
private Observable<String> obAsyncString() {
return Observable.just("")
.observeOn(scheduler)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
private Observable<String> obAsyncString1() {
return Observable.just("")
.observeOn(scheduler1)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
private Observable<String> obAsyncString2() {
return Observable.just("")
.observeOn(scheduler2)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
You can see more examples here https://github.com/politrons/reactive
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With