Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make multiple API request with RxJava and combine them?

I have to make N REST API calls and combine the results of all of them, or fail if at least one of the calls failed (returned an error or a timeout). I want to use RxJava and I have some requirements:

  • Be able to configure a retry of each individual api call under some circumstances. I mean, if I have a retry = 2 and I make 3 requests each one has to be retried at most 2 times, with at most 6 requests in total.
  • Fail fast! If one API calls have failed N times (where the N is the configuration of the retries) it doesn't mater if the remaining requests hasn't ended, I want to return an error.

If I wish to make all the request with a single Thread, I would need an async Http Client, wouldn't?

Thanks.

like image 385
gabrielgiussi Avatar asked Sep 15 '25 17:09

gabrielgiussi


1 Answers

You could use Zip operator to zip all request together once they ends and check there if all of them were success

 private Scheduler scheduler;
private Scheduler scheduler1;
private Scheduler scheduler2;

/**
 * Since every observable into the zip is created to subscribeOn a different thread, it´s means all of them will run in parallel.
 * By default Rx is not async, only if you explicitly use subscribeOn.
 */
@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
            .concat(s3))
            .subscribe(result -> showResult("Async in:", start, result));
}

private Observable<String> obAsyncString() {
    return Observable.just("Request1")
            .observeOn(scheduler)
            .doOnNext(val -> {
                System.out.println("Thread " + Thread.currentThread()
                        .getName());
            })
            .map(val -> "Hello");
}

private Observable<String> obAsyncString1() {
    return Observable.just("Request2")
            .observeOn(scheduler1)
            .doOnNext(val -> {
                System.out.println("Thread " + Thread.currentThread()
                        .getName());
            })
            .map(val -> " World");
}

private Observable<String> obAsyncString2() {
    return Observable.just("Request3")
            .observeOn(scheduler2)
            .doOnNext(val -> {
                System.out.println("Thread " + Thread.currentThread()
                        .getName());
            })
            .map(val -> "!");
}

In this example we just concat the results, but instead of do that, you can check the results and do your business logic there.

You can also consider merge or contact also.

you can take a look more examples here https://github.com/politrons/reactive

like image 90
paul Avatar answered Sep 18 '25 10:09

paul