Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

executing a method in parallel from a call method

I have a library which is being used by customer and they are passing DataRequest object which has userid, timeout and some other fields in it. Now I use this DataRequest object to make a URL and then I make an HTTP call using RestTemplate and my service returns back a JSON response which I use it to make a DataResponse object and return this DataResponse object back to them.

Below is my DataClient class used by customer by passing DataRequest object to it. I am using timeout value passed by customer in DataRequest to timeout the request if it is taking too much time in getSyncData method.

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public DataResponse getSyncData(DataRequest key) {
        DataResponse response = null;
        Future<DataResponse> responseFuture = null;

        try {
            responseFuture = getAsyncData(key);
            response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true);
            // logging exception here               
        }

        return response;
    }   

    @Override
    public Future<DataResponse> getAsyncData(DataRequest key) {
        DataFetcherTask task = new DataFetcherTask(key, restTemplate);
        Future<DataResponse> future = service.submit(task);

        return future;
    }
}

DataFetcherTask class:

public class DataFetcherTask implements Callable<DataResponse> {

    private DataRequest key;
    private RestTemplate restTemplate;

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() throws Exception {
        // In a nutshell below is what I am doing here. 
        // 1. Make an url using DataRequest key.
        // 2. And then execute the url RestTemplate.
        // 3. Make a DataResponse object and return it.

        // I am calling this whole logic in call method as LogicA
    }
}

As of now my DataFetcherTask class is responsible for one DataRequest key as shown above..

Problem Statement:-

Now I have a small design change. Customer will pass DataRequest (for example keyA) object to my library and then I will make a new http call to another service (which I am not doing in my current design) by using user id present in DataRequest (keyA) object which will give me back list of user id's so I will use those user id's and make few other DataRequest (keyB, keyC, keyD) objects one for each user id returned in the response. And then I will have List<DataRequest> object which will have keyB, keyC and keyD DataRequest object. Max element in the List<DataRequest> will be three, that's all.

Now for each of those DataRequest object in List<DataRequest> I want to execute above DataFetcherTask.call method in parallel and then make List<DataResponse> by adding each DataResponse for each key. So I will have three parallel calls to DataFetcherTask.call. Idea behind this parallel call is to get the data for all those max three keys in the same global timeout value.

So my proposal is - DataFetcherTask class will return back List<DataResponse> object instead of DataResponse and then signature of getSyncData and getAsyncData method will change as well. So here is the algorithm:

  • Use DataRequest object passed by customer to make List<DataRequest> by calling another HTTP service.
  • Make a parallel call for each DataRequest in List<DataRequest> to DataFetcherTask.call method and return List<DataResponse> object to customer instead of DataResponse.

With this way, I can apply same global timeout on step 1 along with step 2 as well. If either of above step is taking time, we will just timeout in getSyncData method.

DataFetcherTask class after design change:

public class DataFetcherTask implements Callable<List<DataResponse>> {

    private DataRequest key;
    private RestTemplate restTemplate;
    // second executor here
    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public List<DataResponse> call() throws Exception {
        List<DataRequest> keys = generateKeys();
        CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);

        int count = 0;
        for (final DataRequest key : keys) {
            comp.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    return performDataRequest(key);
                }
            });
        }

        List<DataResponse> responseList = new ArrayList<DataResponse>();
        while (count-- > 0) {
            Future<DataResponse> future = comp.take();
            responseList.add(future.get());
        }
        return responseList;
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys() {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }       

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
    }
}

Now my question is -

  • Does it have to be like this? What is the right design to solve this problem? I mean having call method in another call method looks weird?
  • Do we need to have two executors like I have in my code? Is there any better way to solve this problem or any kind of simplification/design change we can do here?

I have simplified the code so that idea gets clear what I am trying to do..

like image 942
john Avatar asked Dec 16 '15 15:12

john


1 Answers

As already mentioned in the comments of your question, you can use Java's ForkJoin framework. This will save you the extra thread pool within your DataFetcherTask.

You simply need to use a ForkJoinPool in your DataClient and convert your DataFetcherTask into a RecursiveTask (one of ForkJoinTask's subtypes). This allows you to easily execute other subtasks in parallel.

So, after these modifications your code will look something like this:

DataFetcherTask

The DataFetcherTask is now a RecursiveTask which first generates the keys and invokes subtasks for each generated key. These subtasks are executed in the same ForkJoinPool as the parent task.

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {

  private final DataRequest key;
  private final RestTemplate restTemplate;

  public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
      this.key = key;
      this.restTemplate = restTemplate;
  }

  @Override
  protected List<DataResponse> compute() {
    // Create subtasks for the key and invoke them
    List<DataRequestTask> requestTasks = requestTasks(generateKeys());
    invokeAll(requestTasks);

    // All tasks are finished if invokeAll() returns.
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
    for (DataRequestTask task : requestTasks) {
      try {
        responseList.add(task.get());
      } catch (InterruptedException | ExecutionException e) {
        // TODO - Handle exception properly
        Thread.currentThread().interrupt();
        return Collections.emptyList();
      }
    }

    return responseList;
  }

  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
    List<DataRequestTask> tasks = new ArrayList<>(keys.size());
    for (DataRequest key : keys) {
      tasks.add(new DataRequestTask(key));
    }

    return tasks;
  }

  // In this method I am making a HTTP call to another service
  // and then I will make List<DataRequest> accordingly.
  private List<DataRequest> generateKeys() {
      List<DataRequest> keys = new ArrayList<>();
      // use key object which is passed in contructor to make HTTP call to another service
      // and then make List of DataRequest object and return keys.
      return keys;
  }

  /** Inner class for the subtasks. */
  private static class DataRequestTask extends RecursiveTask<DataResponse> {

    private final DataRequest request;

    public DataRequestTask(DataRequest request) {
      this.request = request;
    }

    @Override
    protected DataResponse compute() {
      return performDataRequest(this.request);
    }

    private DataResponse performDataRequest(DataRequest key) {
      // This will have all LogicA code here which is shown in my original design.
      // everything as it is same..
      return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
    }
  }

}

DataClient

The DataClient will not change much except for the new thread pool:

public class DataClient implements Client {

  private final RestTemplate restTemplate = new RestTemplate();
  // Replace the ExecutorService with a ForkJoinPool
  private final ForkJoinPool service = new ForkJoinPool(15);

  @Override
  public List<DataResponse> getSyncData(DataRequest key) {
      List<DataResponse> responsList = null;
      Future<List<DataResponse>> responseFuture = null;

      try {
          responseFuture = getAsyncData(key);
          responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
      } catch (TimeoutException | ExecutionException | InterruptedException ex) {
          responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
          responseFuture.cancel(true);
          // logging exception here
      }

      return responsList;
  }

  @Override
  public Future<List<DataResponse>> getAsyncData(DataRequest key) {
      DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
      return this.service.submit(task);
  }
}

Once you are on Java8 you may consider changing the implementation to CompletableFutures. Then it would look something like this:

DataClientCF

public class DataClientCF {

  private final RestTemplate restTemplate = new RestTemplate();
  private final ExecutorService executor = Executors.newFixedThreadPool(15);

  public List<DataResponse> getData(DataRequest initialKey) {
    return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
      .thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
      .thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
      .exceptionally(t -> { throw new RuntimeException(t); })
      .join();
  }

  private List<DataRequest> generateKeys(DataRequest key) {
    return new ArrayList<>();
  }

  private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
    return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
  }
}

As mentioned in the comments, Guava's ListenableFutures would provide similar functionality for Java7 but without Lambdas they tend to get clumsy.

like image 118
Stefan Ferstl Avatar answered Sep 30 '22 06:09

Stefan Ferstl