I need to make a library in which I will have synchronous and asynchronous feature.
executeSynchronous()
- waits until I have a result, returns the result.executeAsynchronous()
- returns a Future immediately which can be processed after other things are done, if needed.Core Logic of my Library
The customer will use our library and they will call it by passing DataKey
builder object. We will then construct a URL by using that DataKey
object and make a HTTP client call to that URL by executing it and after we get the response back as a JSON String, we will send that JSON String back to our customer as it is by creating DataResponse
object. Some customer will call executeSynchronous()
and some might call executeAsynchronous()
so that's why I need to provide two method separately in my library.
Interface:
public interface Client {
// for synchronous
public DataResponse executeSynchronous(DataKey key);
// for asynchronous
public Future<DataResponse> executeAsynchronous(DataKey key);
}
And then I have my DataClient
which implements the above Client
interface:
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
private ExecutorService executor = Executors.newFixedThreadPool(10);
// for synchronous call
@Override
public DataResponse executeSynchronous(DataKey key) {
DataResponse dataResponse = null;
Future<DataResponse> future = null;
try {
future = executeAsynchronous(key);
dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
// does this looks right?
future.cancel(true); // terminating tasks that have timed out
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
//for asynchronous call
@Override
public Future<DataResponse> executeAsynchronous(DataKey key) {
Future<DataResponse> future = null;
try {
Task task = new Task(key, restTemplate);
future = executor.submit(task);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
}
return future;
}
}
Simple class which will perform the actual task:
public class Task implements Callable<DataResponse> {
private DataKey key;
private RestTemplate restTemplate;
public Task(DataKey key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() {
DataResponse dataResponse = null;
String response = null;
try {
String url = createURL();
response = restTemplate.getForObject(url, String.class);
// it is a successful response
dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
} catch (RestClientException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
// create a URL by using key object
private String createURL() {
String url = somecode;
return url;
}
}
When I started working on this solution, I was not terminating the tasks that have timed out. I was reporting the timeout to the client, but the task continues to run in the thread pool (potentially occupying one of my limited 10 threads for a long time). So I did some research online and I found out that I can cancel my tasks those have timed out by using cancel on future as shown below -
future.cancel(true);
But if I am doing like this as shown in my above solution, then do I need to close any other resources like RestTemplate
as soon as thread is interrupted? If yes, then how would I do that? Also, can we interrupt RestTemplate
calls? Since I tried calling cancel on my future as soon as the task got timed out but I guess my thread doesn't got interrupted.
Should we always be terminating the tasks that have got timed out? If we don't do that then what might be the impact I will have? Will it affect my performance?
Is there any better solution to deal with this case with my current setup?
It appears that a call to a RestTemplate
cannot be interrupted or canceled. Even if the "kludge" using a callback is utilized, the RestTemplate
might have resources locked up internally, waiting for the response before invoking the callback.
When the underlying socket is accessible, network I/O can be aborted by closing the socket from another thread. For example, a timer can be started to close the socket after a timeout elapses. Or, if you want an indefinite timeout that is sensitive to interrupts (due to a user pressing a "Cancel" button, for example), you can submit a task that waits indefinitely but responds to interrupts by closing the socket.
Unfortunately, it doesn't look like the authors of RestTemplate
provided this capability.
Yes, you should clean up resources that are no longer needed because of task cancellation or expiration. Yes, it will affect performance. If your thread pool has a limited number of threads, eventually all will be stuck in defunct tasks. If it has an unlimited number of threads, eventually memory will become exhausted.
Sometimes it is not possible to interrupt thread especially when thread performs blocking operations on Socket.
So instead of cancelling the task when it timeouts, you should rather set timeouts on http connection.
Unfortunately timeousts are set per Connection Factory and RestTemplate, thus each request must use it's own RestTemplate.
You can create new RestTemplate per task, or reuse previusly created templates using ThreadLocal or resource pooling.
For example the task using Thread local might look like below:
public class Task implements Callable<DataResponse> {
private DataKey key;
private ThreadLocal<RestTemplate> restTemplateThreadLocal =
ThreadLocal.withInitial(()->new RestTemplate(new SimpleClientHttpRequestFactory()));
public Task(DataKey key) {
this.key = key;
}
private SimpleClientHttpRequestFactory getConnectionFactory(){
return (SimpleClientHttpRequestFactory)restTemplateThreadLocal.get().getRequestFactory();
}
@Override
public DataResponse call() {
DataResponse dataResponse = null;
String response = null;
try {
String url = createURL();
//it is up to you, how to set connection and read timeouts from provided key.getTimeout
getConnectionFactory().setConnectTimeout(1000);
getConnectionFactory().setReadTimeout(key.getTimeout());
response = restTemplateThreadLocal.get().getForObject(url, String.class);
// it is a successful response
dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
} catch (RestClientException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
// create a URL by using key object
private String createURL() {
String url = somecode;
return url;
}
}
BTW. Spring also provides AsyncRestTemplate, which may make your code simpler. If used with Netty4ClientHttpRequestFactory you can get NIO based client connections. In such case, you should be able to interrupt your tasks even while it makes Http connection.
Short sample below. It uses NIO thus you does not have to care if the request is really cancelled after Timeout.
URI url = new URI("http://www.chicagotribune.com/news/ct-college-of-dupage-investigation-met-20150330-story.html");
Netty4ClientHttpRequestFactory asyncRequestFactory = new Netty4ClientHttpRequestFactory();
AsyncRestTemplate asyncRestTemplate = new AsyncRestTemplate(asyncRequestFactory);
ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
System.out.println("entity.get() = " + entity.get());
asyncRequestFactory.destroy();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With