Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to abstract asynchronous HTTP requests as synchronous

I have a third party REST API service that behaves 'asynchronously'; as in requests respond with intermediate responses that are supplemented by callbacks with a correlator to the intermediate response.

The callbacks are returned almost immediately via a 'callback URL'.

I am trying to devise a solution, sort of an adapter to call this resource as though it was 'synchronous' because it is cumbersome to deal with the callbacks, especially when I need to batch successive requests serially to other similar APIs by the same thirdparty. Basically I want to abstract the green part so that the caller only gets full callback, an error or timeout exception. enter image description here

My research points at using RxJava but I can't figure out how this problem can be solved by principles of reactive programming(my understanding is limited).

Design considerations:

  1. Persisting correlator to a datastore for later lookup upon callback is not desirable because it is expensive
  2. Wait strategy for the callback is OK because the response times for the callback is less than 1 sec

How can I employ a CompletableFuture, or Observable-Observer pattern to wait for the callback and return to the caller?

like image 423
kosgeinsky Avatar asked Sep 19 '25 15:09

kosgeinsky


1 Answers

Consider using a CountDownLatch for the main thread to wait until a worker thread has dealt with the third party API. The worker gets the callbacks so it knows when the request has made progress, is complete, timed out, had an error, etc.

Here's a rough simulation:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;

public class TreatAsSync {

    public static void main(String[] args) {
        TreatAsSync app = new TreatAsSync();
        app.call();
    }

    private void call() {
        RestClient restClient = new RestClient();
        Request request = new Request();
        Response response = restClient.call(request);
        System.out.println("Response was: " + response);
    }

    private class Request {

    }

    private class Response {
        private final boolean error;
        private final boolean timedOut;
        private final String result;

        public Response(boolean error, boolean timedOut, String result) {
            this.error = error;
            this.timedOut = timedOut;
            this.result = result;
        }

        public String toString() {
            return "error:" + error + ", timedOut: " + timedOut + ", result: " + result;
        }
    }

    private class ResponseWrapper {
        private Response response;

        public Response getResponse() {
            return response;
        }

        public void setRespose(Response response) {
            this.response = response;
        }
    }

    private class RestClient {

        public Response call(Request request) {
            ResponseWrapper wrapper = new ResponseWrapper();
            CountDownLatch latch = new CountDownLatch(1);

            ThirdPartyRunner runner = new ThirdPartyRunner(request, wrapper, latch);
            new Thread(runner).start();

            try {
                latch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return wrapper.getResponse();
        }
    }

    private interface ThirdPartyCallBack {
        public void onProgress(Response response);
        public void onComplete(Response response);
        public void onTimeOut(Response response);
        public void onError(Response response);
    }

    private class ThirdPartyRunner implements ThirdPartyCallBack, Runnable {
        private final Request request;
        private final ResponseWrapper wrapper;
        private final CountDownLatch latch;

        public ThirdPartyRunner(Request request, ResponseWrapper wrapper, CountDownLatch latch) {
            this.request = request;
            this.wrapper = wrapper;
            this.latch = latch;
        }

        @Override
        public void onProgress(Response response) {
            System.out.println("some progress was made...");
        }

        @Override
        public void onComplete(Response response) {
            System.out.println("request completed");
            finished(response);
        }

        @Override
        public void onTimeOut(Response response) {
            System.out.println("request timed out");
            finished(response);
        }

        @Override
        public void onError(Response response) {
            System.out.println("request had an error");
            finished(response);
        }

        private void finished(Response response) {
            wrapper.setRespose(response);
            latch.countDown();
        }

        @Override
        public void run() {
            try {
                callThirdParty();
            } catch (Exception e) {
                finished(new Response(true, false, e.getMessage()));
            }
        }

        private void callThirdParty() {
            // simulate api.call(request, this);
            for (int i = 0; i < ThreadLocalRandom.current().nextInt(10) + 1; i++) {
                onProgress(new Response(false, false, "in progress"));
            }

            switch (ThreadLocalRandom.current().nextInt(3)) {
            case 0:
                onComplete(new Response(false, false, "done"));
                break;

            case 1:
                onTimeOut(new Response(false, true, "hello?"));
                break;

            case 2:
                onError(new Response(true, false, "uh oh!"));
                break;
            }
        }
    }

}
like image 95
Andrew S Avatar answered Sep 21 '25 03:09

Andrew S