I have a library which is being used by customer and they are passing DataRequest
object which has userid
, timeout
and some other fields in it. Now I use this DataRequest
object to make a URL and then I make an HTTP call using RestTemplate
and my service returns back a JSON response which I use it to make a DataResponse
object and return this DataResponse
object back to them.
Below is my DataClient
class used by customer by passing DataRequest
object to it. I am using timeout value passed by customer in DataRequest
to timeout the request if it is taking too much time in getSyncData
method.
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
// first executor
private ExecutorService service = Executors.newFixedThreadPool(15);
@Override
public DataResponse getSyncData(DataRequest key) {
DataResponse response = null;
Future<DataResponse> responseFuture = null;
try {
responseFuture = getAsyncData(key);
response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException ex) {
response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
responseFuture.cancel(true);
// logging exception here
}
return response;
}
@Override
public Future<DataResponse> getAsyncData(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, restTemplate);
Future<DataResponse> future = service.submit(task);
return future;
}
}
DataFetcherTask
class:
public class DataFetcherTask implements Callable<DataResponse> {
private DataRequest key;
private RestTemplate restTemplate;
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() throws Exception {
// In a nutshell below is what I am doing here.
// 1. Make an url using DataRequest key.
// 2. And then execute the url RestTemplate.
// 3. Make a DataResponse object and return it.
// I am calling this whole logic in call method as LogicA
}
}
As of now my DataFetcherTask
class is responsible for one DataRequest
key as shown above..
Problem Statement:-
Now I have a small design change. Customer will pass DataRequest
(for example keyA) object to my library and then I will make a new http call to another service (which I am not doing in my current design) by using user id present in DataRequest
(keyA) object which will give me back list of user id's so I will use those user id's and make few other DataRequest
(keyB, keyC, keyD) objects one for each user id returned in the response. And then I will have List<DataRequest>
object which will have keyB, keyC and keyD DataRequest
object. Max element in the List<DataRequest>
will be three, that's all.
Now for each of those DataRequest
object in List<DataRequest>
I want to execute above DataFetcherTask.call
method in parallel and then make List<DataResponse>
by adding each DataResponse
for each key. So I will have three parallel calls to DataFetcherTask.call
. Idea behind this parallel call is to get the data for all those max three keys in the same global timeout value.
So my proposal is - DataFetcherTask
class will return back List<DataResponse>
object instead of DataResponse
and then signature of getSyncData
and getAsyncData
method will change as well. So here is the algorithm:
List<DataRequest>
by calling another HTTP service.DataRequest
in List<DataRequest>
to DataFetcherTask.call
method and return List<DataResponse>
object to customer instead of DataResponse
.With this way, I can apply same global timeout on step 1 along with step 2 as well. If either of above step is taking time, we will just timeout in getSyncData
method.
DataFetcherTask
class after design change:
public class DataFetcherTask implements Callable<List<DataResponse>> {
private DataRequest key;
private RestTemplate restTemplate;
// second executor here
private ExecutorService executorService = Executors.newFixedThreadPool(10);
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public List<DataResponse> call() throws Exception {
List<DataRequest> keys = generateKeys();
CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);
int count = 0;
for (final DataRequest key : keys) {
comp.submit(new Callable<DataResponse>() {
@Override
public DataResponse call() throws Exception {
return performDataRequest(key);
}
});
}
List<DataResponse> responseList = new ArrayList<DataResponse>();
while (count-- > 0) {
Future<DataResponse> future = comp.take();
responseList.add(future.get());
}
return responseList;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
}
}
Now my question is -
call
method in another call
method looks weird? I have simplified the code so that idea gets clear what I am trying to do..
As already mentioned in the comments of your question, you can use Java's ForkJoin framework. This will save you the extra thread pool within your DataFetcherTask
.
You simply need to use a ForkJoinPool
in your DataClient
and convert your DataFetcherTask
into a RecursiveTask
(one of ForkJoinTask
's subtypes). This allows you to easily execute other subtasks in parallel.
So, after these modifications your code will look something like this:
DataFetcherTask
The DataFetcherTask
is now a RecursiveTask
which first generates the keys and invokes subtasks for each generated key. These subtasks are executed in the same ForkJoinPool
as the parent task.
public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {
private final DataRequest key;
private final RestTemplate restTemplate;
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
protected List<DataResponse> compute() {
// Create subtasks for the key and invoke them
List<DataRequestTask> requestTasks = requestTasks(generateKeys());
invokeAll(requestTasks);
// All tasks are finished if invokeAll() returns.
List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
for (DataRequestTask task : requestTasks) {
try {
responseList.add(task.get());
} catch (InterruptedException | ExecutionException e) {
// TODO - Handle exception properly
Thread.currentThread().interrupt();
return Collections.emptyList();
}
}
return responseList;
}
private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
List<DataRequestTask> tasks = new ArrayList<>(keys.size());
for (DataRequest key : keys) {
tasks.add(new DataRequestTask(key));
}
return tasks;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
/** Inner class for the subtasks. */
private static class DataRequestTask extends RecursiveTask<DataResponse> {
private final DataRequest request;
public DataRequestTask(DataRequest request) {
this.request = request;
}
@Override
protected DataResponse compute() {
return performDataRequest(this.request);
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
}
}
}
DataClient
The DataClient
will not change much except for the new thread pool:
public class DataClient implements Client {
private final RestTemplate restTemplate = new RestTemplate();
// Replace the ExecutorService with a ForkJoinPool
private final ForkJoinPool service = new ForkJoinPool(15);
@Override
public List<DataResponse> getSyncData(DataRequest key) {
List<DataResponse> responsList = null;
Future<List<DataResponse>> responseFuture = null;
try {
responseFuture = getAsyncData(key);
responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException | ExecutionException | InterruptedException ex) {
responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
responseFuture.cancel(true);
// logging exception here
}
return responsList;
}
@Override
public Future<List<DataResponse>> getAsyncData(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
return this.service.submit(task);
}
}
Once you are on Java8 you may consider changing the implementation to CompletableFuture
s. Then it would look something like this:
DataClientCF
public class DataClientCF {
private final RestTemplate restTemplate = new RestTemplate();
private final ExecutorService executor = Executors.newFixedThreadPool(15);
public List<DataResponse> getData(DataRequest initialKey) {
return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
.thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
.thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
.exceptionally(t -> { throw new RuntimeException(t); })
.join();
}
private List<DataRequest> generateKeys(DataRequest key) {
return new ArrayList<>();
}
private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
}
}
As mentioned in the comments, Guava's ListenableFuture
s would provide similar functionality for Java7 but without Lambdas they tend to get clumsy.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With