Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cancel AsyncRestTemplate HTTP request if they are taking too much time?

Since starting, I was always confuse of how to deal with InterruptedException and how to properly cancel the http request if they are taking too much time. I have a library in which I have provided two methods, sync and async for our customer. They can call whichever method they feel is right for their purpose.

  • executeSync() - waits until I have a result, returns the result.
  • executeAsync() - returns a Future immediately which can be processed after other things are done, if needed.

They will pass DataKey object which has the user id and timeout value in it. We will figure out which machine to call basis on the user id and then create an URL with that machine and we will make http call to the URL using AsyncRestTemplate and then send the response back to them basis on whether it is successful or not.

I am using exchange method of AsyncRestTemplate which returns back a ListenableFuture and I wanted to have async non blocking architecture with NIO based client connections so that request uses non blocking IO so that's why I went with AsyncRestTemplate. Does this approach sounds right for my problem definition? This library will be used in production under very heavy load.

Below is my interface:

public interface Client {
    // for synchronous
    public DataResponse executeSync(DataKey key);

    // for asynchronous
    public ListenableFuture<DataResponse> executeAsync(DataKey key);
}

And below is my implementation of the interface:

public class DataClient implements Client {

    // using spring 4 AsyncRestTemplate
    private final AsyncRestTemplate restTemplate = new AsyncRestTemplate();

    // for synchronous
    @Override
    public DataResponse executeSync(DataKey keys) {
        Future<DataResponse> responseFuture = executeAsync(keys);
        DataResponse response = null;

        try {
            response = responseFuture.get(keys.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
            // do we need to catch InterruptedException here and interrupt the thread?
            Thread.currentThread().interrupt();
            // also do I need throw this RuntimeException at all?
            throw new RuntimeException("Interrupted", ex);
        } catch (TimeoutException ex) {
            DataLogging.logEvents(ex, DataErrorEnum.CLIENT_TIMEOUT, keys);
            response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true); // terminating the tasks that got timed out so that they don't take up the resources?
        } catch (Exception ex) {
            DataLogging.logEvents(ex, DataErrorEnum.ERROR_CLIENT, keys);
            response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
        }

        return response;
    }

    // for asynchronous     
    @Override
    public ListenableFuture<DataResponse> executeAsync(final DataKey keys) {

        final SettableFuture<DataResponse> responseFuture = SettableFuture.create();
        final org.springframework.util.concurrent.ListenableFuture orig = 
            restTemplate.exchange(createURL(keys), HttpMethod.GET, keys.getEntity(), String.class);

        orig.addCallback(
                new ListenableFutureCallback<ResponseEntity<String>>() {
                    @Override
                    public void onSuccess(ResponseEntity<String> result) {
                        responseFuture.set(new DataResponse(result.getBody(), DataErrorEnum.OK,
                                DataStatusEnum.SUCCESS));
                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        DataLogging.logErrors(ex, DataErrorEnum.ERROR_SERVER, keys);
                        responseFuture.set(new DataResponse(null, DataErrorEnum.ERROR_SERVER,
                                DataStatusEnum.ERROR));
                    }
                });

        // propagate cancellation back to the original request
        responseFuture.addListener(new Runnable() {
          @Override public void run() {
             if (responseFuture.isCancelled()) {
               orig.cancel(false); // I am keeping this false for now
             }
          }
        }, MoreExecutors.directExecutor());
        return responseFuture;
    }
}

And customer will be calling like this from their code -

// if they are calling executeSync() method
DataResponse response = DataClientFactory.getInstance().executeSync(dataKey);

// and if they want to call executeAsync() method
Future<DataResponse> response = DataClientFactory.getInstance().executeAsync(dataKey);

Now the question is -

  1. Can we interrupt AsyncRestTemplate call if http request is taking too long? I am actually calling cancel on my future in my above code in executeSync method but I am not sure how do I verify it to make sure it is doing what it should? I want to propagate cancellation back to the original future, so that I can cancel the corresponding http request (which I probably want to do to save resources) so that's why I have added a listener in my executeAsync method. I believe, we cannot interrupt RestTemplate calls but not sure on AsyncRestTemplate whether we can do that or not. If let's say we can interrupt AsyncRestTemplate calls, then am I doing everything right to interrupt the http calls? Or is there any better/cleaner way to do this? Or Do I even need to worry about cancelling the Http request with AsyncRestTemplate with my current design?

        // propagate cancellation back to the original request
        responseFuture.addListener(new Runnable() {
          @Override public void run() {
             if (responseFuture.isCancelled()) {
               orig.cancel(false); // I am keeping this false for now
             }
          }
        }, MoreExecutors.directExecutor()); 
    

    With the current setup, I can see it is throwing CancellationException some of the times (not everytime)- Does that mean my HTTP request was cancelled then?

  2. Also am I doing the right thing in catch block of InterruptedException in executeSync method? If not, then what's the right way to deal with that. And do I need to deal with InterruptedException at all in my case?
  3. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread? If yes, then is there any way to have NIO based client connections in my current setup?

Any explanations/code suggestions will be of great help.

like image 511
john Avatar asked Apr 01 '15 00:04

john


1 Answers

First of all, Why are you using SettableFuture? Why can't just return the ListenableFuture returned by AsyncRestTemplate?

1. Can we interrupt AsyncRestTemplate call if http request is taking too long?

Of course you do! You only need to call Future.cancel method. This method will interrupt the execution of the internal RestTemplate that AsyncRestTemplate is actually using.

2. Also am I doing the right thing in catch block of InterruptedException in executeSync method?

As Phil and Danilo have said, you don't need to interrupt the current thread within the InterruptedException catch block. Just do whatever you need to do when the execution of the request must be canceled.

In fact, I recommend you create a method that handles this behaviour, something like handleInterruption, and use this method for both TimeoutException and InterruptedException.

3. Is it true that by default AsyncRestTamplete uses blocking calls and request per thread?

Yes. The default constructor of AsyncRestTamplete is internally using SimpleClientHttpRequestFactory and SimpleAsyncTaskExecutor.

This TaskExecutor always starts a threat for every task, and never reuse Threads, so it's very inefficient:

 * TaskExecutor implementation that fires up a new Thread for each task,
 * executing it asynchronously.
 *
 * Supports limiting concurrent threads through the "concurrencyLimit"
 * bean property. By default, the number of concurrent threads is unlimited.
 *
 * NOTE: This implementation does not reuse threads! Consider a
 * thread-pooling TaskExecutor implementation instead, in particular for
 * executing a large number of short-lived tasks.
 *

I recommend you use another configuration of AsyncRestTemplate.

You should use the constructor of AsyncRestTemplate that uses another TaskExecutor:

public AsyncRestTemplate(AsyncListenableTaskExecutor taskExecutor)

For instance:

AsyncRestTemplate template = new AsyncRestTemplate(new ConcurrentTaskExecutor(Executors.newCachedThreadPool()));

This ExecutorService (Executors.newCachedThreadPool()) creates new threads as needed, but will reuse previously constructed threads when they are available.

Or even better, you can use another RequestFactory. For instance, you can use HttpComponentsAsyncClientHttpRequestFactory, that internally uses NIO, just calling the proper constructor of AsyncRestTemplate:

new AsyncRestTemplate(new HttpComponentsAsyncClientHttpRequestFactory())

Don't forget the internal behaviour of AsyncRestTemplate will depend on how you create the object.

like image 68
jfcorugedo Avatar answered Nov 06 '22 01:11

jfcorugedo